Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heartbeat expiration when topic is idle #220

Open
bergeraccso opened this issue Aug 25, 2023 · 0 comments
Open

heartbeat expiration when topic is idle #220

bergeraccso opened this issue Aug 25, 2023 · 0 comments

Comments

@bergeraccso
Copy link

hi,

I'm not sure whether this is a bug in the library, an error of implementation on my part or simply intended behaviour.
Im building a containerized demo app for c++. one container produces 10 messages and then shuts down. The other container is always online and consumes indefinitely.

consider this consumer:

Properties consumer_properties({
    {"bootstrap.servers", EnvConfig().kafka_config.brokers()},
    {"group.id", EnvConfig().kafka_config.group_id()},
    {"auto.offset.reset", EnvConfig().kafka_config.auto_offset_reset()}
    });

KConsumer::KConsumer(Topic _topic){
    KConsumer::topic = _topic;
};

void KConsumer::subscribe(std::function<void(std::string)> handle_message){
    // Use Ctrl-C to terminate the program
    signal(SIGINT, stopRunning);    // NOLINT

    //init consumer and assign partition
    KafkaConsumer consumer(consumer_properties);
    consumer.subscribe( {topic}, NullRebalanceCallback,std::chrono::milliseconds(30000));

    // consume loop
    while (running) {
        // Poll messages from Kafka brokers
        auto records = consumer.poll(std::chrono::milliseconds(100));
        /* 
            FIXME: if there are no new records incoming for a while the consumer will be kicked off of the consumer group due to missing heartbeats.
            Not sure if this is just a short-coming on the frameworks side or our implementation mistake.
            However, the consumer will resume after a rebalance and maybe this is an intended behaviour?
        */

        for (const auto& record: records) {
            if (!record.error()) {
                //TODO do this on debug/trace level
                std::cout << "Got a new message..." << std::endl;
                std::cout << "    Topic    : " << record.topic() << std::endl;
                std::cout << "    Partition: " << record.partition() << std::endl;
                std::cout << "    Offset   : " << record.offset() << std::endl;
                std::cout << "    Timestamp: " << record.timestamp().toString() << std::endl;
                std::cout << "    Headers  : " << toString(record.headers()) << std::endl;
                std::cout << "    Key   [" << record.key().toString() << "]" << std::endl;
                std::cout << "    Value [" << record.value().toString() << "]" << std::endl;
                handle_message(record.value().toString()); // evoke callback
            } else {
                std::cerr << record.toString() << std::endl;
            }
        }
        //consumer.commitSync(); // auto.commit is enabled by default
    }
}

now what will happen upon starting the producer and consumer. is that after the last message is consumed, the consumer will be kicked off of the consumer group after the heartbeat-timeout-intervall and a rebalancing is triggered.

container logs:

<...>
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:06.845688]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[test-0]
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:06.846237]NOTICE KafkaConsumer[2d773c79-c6f4ba27] subscribed, topics[test]
<...>
kafka-plot-interface-utp-sink-connector-1 | Got a new message...
kafka-plot-interface-utp-sink-connector-1 | Topic : test
kafka-plot-interface-utp-sink-connector-1 | Partition: 0
kafka-plot-interface-utp-sink-connector-1 | Offset : 75
kafka-plot-interface-utp-sink-connector-1 | Timestamp: CreateTime[2023-08-22 08:43:34.972]
kafka-plot-interface-utp-sink-connector-1 | Headers :
kafka-plot-interface-utp-sink-connector-1 | Key [[null]]
kafka-plot-interface-utp-sink-connector-1 | Value [Hello, World 10]
kafka-plot-interface-utp-sink-connector-1 | broadcasting message: Hello, World 10
kafka-plot-interface-utp-sink-connector-1 | message sent successfully

kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,793] INFO [GroupCoordinator 0]: Member ffb336d6-3f9b5c63-b26e474b-24ef-433e-84fb-a5e4e9468d26 in group utp_sink_connector has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,794] INFO [GroupCoordinator 0]: Preparing to rebalance group utp_sink_connector in state PreparingRebalance with old generation 5 (__consumer_offsets-37) (reason: removing member ffb336d6-3f9b5c63-b26e474b-24ef-433e-84fb-a5e4e9468d26 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:51.851106]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[REVOKE_PARTITIONS], cooperative[disabled], topic-partitions[test-0]
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,852] INFO [GroupCoordinator 0]: Stabilized group utp_sink_connector generation 6 (__consumer_offsets-37) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,857] INFO [GroupCoordinator 0]: Assignment received from leader 2d773c79-c6f4ba27-00c8b2f2-89bf-45ab-9441-bbb2c0586bdf for group utp_sink_connector for generation 6. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:51.858916]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[test-0]

so TLDR:
I think consumers should send heartbeats in the background independed of their consumption loops. which they don't ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant