-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Infinite automatic Kafka offset resetting #11658
Comments
@FrankChen021 - I believe the code is doing the right thing. It may be doing an unnecessary seek, but it shouldn't cause an infinite retry loop.
So it gets the current position for partition, seeks to the earliest offset in that partition, gets the position, and then seeks back or restores to the position it was at. The earliest offset is returned and stored in Then, at line 148
As you can see, two seeks were performed to seek to offset 22400192044. Having said that, I recently ran into an issue where even though I have I wonder if you also have run into something similar? |
@samarthjain You have run into the exact problem I described in this issue. Since the
Notice that here So the kafka ingestion does reset again and again, this is what I mean a 'infinite loop'. This problem usually happens when the Kafka partition expires before Druid reads message from it. |
@FrankChen021 - I don't think the code is entering that block since earliest offset (2944387224) is less than the current offset (22400192044) . There is an extra digit in current offset. I am not exactly sure how current offset can be higher than least offset, but that seemed to be the case here. |
@samarthjain Sorry, these two numbers are too long that I took the earliest as the larger one. In your case, I bet the latest offset is also less than your current offset. I have never encountered such problem at Kafka side if there's no change on this topic. There's a scenario that I can come up with which would lead to such problem: after consuming the topic for a while, delete the topic and then re-create the topic, the log offset at Kafka side would go back to start from zero, which both earliest and latest offset are less than the offset used at the consumer side. So, for such case, I think we could check if the latest offset is also less than the current offset, if it, maybe we should reset the offset to the latest. |
So I know now how we ended up with an offset that was significantly higher than the earliest offset and the reason is similar to what you mentioned, @FrankChen021. The first few versions of the supervisor were talking to Kafka Cluster A. Then, the stream was recreated on Kafka Cluster B and the spec updated accordingly. As a result offsets that Druid had stored for this stream/datasource were no longer valid. We should have stopped the supervisor, wait for all handoffs to complete and then cleared all offsets in DB before resubmitting the new spec with the updated Kafka cluster endpoint. In general, updating streams or clusters or num partitions is problematic with Druid's Kafka ingestion and invariably involves clearing offsets stored in DB. |
@samarthjain Right, updating Kafka clusters are problematic. I'm wondering if it's possible to clear the stored offsets when updating the supervisor spec once the Kafka cluster/topic has changed in the new spec. So, the offset problem won't be left to manual reset or auto reset. |
My Druid Kafka cluster running on Docker Swarm went down due to a network failure, even though the cluster got back up and running, Druid got stuck in the "resetting offset automatically" infinite loop. It happened because Druid was trying to read from a higher offset than that was present in Kafka. The Anyone found a solution? |
Hello, I'm facing the same issue and it seems that this issue is not yet solved. |
facing the same issue:
I can verify that the offsets that are reported by the logs are not ahead of kafka offsets for each partition. |
Is this resolved now? Facing the same issue, but I don't want to duplicate the data and face any data loss by hard resetting. Please let me know if this is fixed I don't have any task leverage to increase the task count. So any other solution @FrankChen021 |
Affected Version
Since 0.16
Description
There's a configuration
resetOffsetAutomatically
inKafkaIndexTaskTuningConfig
that allows Kafka offset to be reset automatically once the Kafka offset is out of range. The error that offset is out of range typically occurs when messages in Kafka expires before the Druid ingestion task reads data from Kafka.But current automatic resetting implementation uses a wrong offset to reset. That means the resetting does no take effect and causes another out of range error, and then automatic resetting is called again. The ingestion task falls into a dead loop.
Problem Analysis
druid/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
Lines 134 to 155 in 59d2578
From the code(Line 148, Line 154) above we can see that, a variable
nextOffset
is used for automatic resetting. But this variable holds the offset we're currently reading from Kafka, and this is the offset that causes out of range exception(Line 134).This means automatic resetting uses the offset which causes out of range to reset the offset. Of course, this resetting won't help and causes another out of range exception in the next round of polling messages from Kafka.
How to fix
To fix this problem, the
leastAvailableOffset
variable should be used to reset the offset. Since there's a check(Line 152) that guarantees that theleastAvailableOffset
is greater than current reading offset, the automatic resetting also won't causes data duplication. The fixes looks like as followsI will open a PR to fix this.
The text was updated successfully, but these errors were encountered: