Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka with topicPattern can ignore old offsets spuriously #16190

Merged
merged 18 commits into from
Apr 17, 2024

Conversation

zachjsh
Copy link
Contributor

@zachjsh zachjsh commented Mar 22, 2024

Fixes #16189

Description

This fixes an issue in which updating a kafka streaming supervisors topic from single to multi-topic (pattern), or vice versa, could cause old offsets to be ignored spuriously. The details of the bug and how it manifests is in the linked issue. To fix the issue, this change properly handles using previously found partition offsets, in the case where the old offsets were recorded and stored by a streaming supervisor with or without multi topic enabled, and the new streaming supervisor has multi-topic enabled or disabled.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
{
return spec.getTuningConfig();
}

@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of overriding this method, you should just override the checkSourceMetadataMatch method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or rename this method appropriately so that callers know that its also filtering the spurious stored offsets

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this fail when updateDataSourceMetadataWithHandle is called later on since that too will match the committed metadata with the new metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logic for the KafkaDataSourceMetadata so that it handles topicPattern during the matching check in updateDataSourceMetadataWithHandle, thanks for pointing this out.

: getIoConfig().getStream().equals(matchValue);

if (!match && !topicMisMatchLogged.contains(matchValue)) {
log.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could happen when going from multi-topic to single-topic? Will these bad offsets get cleared automatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going from multi-topic to single topic, if the multi-topic sequence numbers contained any offsets for streams that do not match the single topic name, the new metadata state will have these sequence offsets removed. However this causes that matches method in the kafka metadata to return false, which will ultimately lead to failure to publish segments, just as going from single topic -> another single topic when there were sequence offsets stored for the first single topic would. I think this is the behavior we want. Users should be explicit when sequence offsets are lost due to config change, and should be forced to reset the respective counters needed, imo. Let me know what you think

Copy link
Contributor

@abhishekrb19 abhishekrb19 Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, having the user reset the offsets explicitly in this scenario when there's no match at all makes sense to me.

? pattern.matcher(matchValue).matches()
: getIoConfig().getStream().equals(matchValue);

if (!match && !topicMisMatchLogged.contains(matchValue)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function seems a bit complex to follow, and there's a risk of introducing bugs due to the multiple if...else conditionals intertwined with multi/single topic logic. Could we maybe add a single block each for the multi topic and single topic logic or make a function each?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplified, let me know if ok now.

@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
{
return spec.getTuningConfig();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A brief javadoc for this function would be useful and would clarify the need for this override from the base implementation and the filtering behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Comment on lines 71 to 77
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: adding a function validateSequenceNumbersBaseType() in the base class should remove this code duplication in 4-5 places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

import java.util.regex.Pattern;
import java.util.stream.Collectors;

@JsonTypeName(SeekableStreamEndSequenceNumbers.TYPE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. How is this jackson type information used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added information about serialization to javadoc, let me know if ok

: getIoConfig().getStream().equals(matchValue);

if (!match && !topicMisMatchLogged.contains(matchValue)) {
log.warn(
Copy link
Contributor

@abhishekrb19 abhishekrb19 Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, having the user reset the offsets explicitly in this scenario when there's no match at all makes sense to me.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall logic and tests LGTM!

@zachjsh zachjsh merged commit 2351f03 into apache:master Apr 17, 2024
85 checks passed
@zachjsh zachjsh deleted the kafa-topicPattern-fix branch April 17, 2024 14:00
@adarshsanjeev adarshsanjeev added this to the 30.0.0 milestone May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka with topicPattern can ignore old offsets spuriously
5 participants