Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka with topicPattern can ignore old offsets spuriously #16189

Closed
zachjsh opened this issue Mar 22, 2024 · 0 comments · Fixed by #16190
Closed

Kafka with topicPattern can ignore old offsets spuriously #16189

zachjsh opened this issue Mar 22, 2024 · 0 comments · Fixed by #16190
Assignees
Labels

Comments

@zachjsh
Copy link
Contributor

zachjsh commented Mar 22, 2024

In SeekableStreamSupervisor::getOffsetsFromMetadataStorage(), we do:

        if (!ioConfig.getStream().equals(partitions.getStream())) {
          log.warn(
              "Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences",
              partitions.getStream(),
              ioConfig.getStream()
          );
          return Collections.emptyMap();
        }

This code is inherited by the Kafka supervisor. In this, ioConfig is the ingestion spec, and partitions is the stored checkpoints for this datasource from the metadata store. This check sees if the ioConfig.getStream() is not the same string as partitions.getStream(), and if isn’t, we ignore stored offsets. This interacts awkwardly with Kafka’s multi-topic support:

ioConfig.getStream() is either topic (name) or topicPattern (pattern), whichever is set. (It’s an error for both to be set.) Therefore, it represents either a single, static topic name or a regular expression pattern.

partitions.getStream() is the ioConfig.getStream() that was used for the last streaming ingestion job on this datasource. As above, this means it can be a name or a pattern.

As a result, offsets get ignored for some situations that may cause inadvertent data duplication:

Converting from a topic name to a pattern, even if the pattern matches the previous name. For example, converting from topic: vehicle to topicPattern: vehicle.*.

The inverse situation, converting from a topic pattern to a name, even if the name matches the previous pattern. For example, converting from topicPattern: topicName.* to topic: topicName.

Changing the pattern where the set of topics selected by the old pattern and the new pattern overlap. For example, changing from topicPattern: foo-.* to topicPattern: foo-x-.* when there are topics like foo-1, foo-x-1, and foo-x-2 would ignore the stored offsets for foo-x-1 and foo-x-2, even though they matched both before and after.

We can improve this for Kafka multi-topic by allowing Kafka to override this method (or some protected/public method it calls). We have to handle:

Topic name → pattern – The checkpoints don’t have a topic name in the partition keys, so if the partitions container’s getStream() matches the new pattern, we need to update the in-memory map to populate this field in the partition key with the value from getStream().

Pattern → topic name – The checkpoints have a topic name in the partition keys, and we need to filter the map by just those entries where the partition key’s topic name exactly matches the new topic name. We ignore the container’s getStream().

Pattern → pattern – The checkpoints have a topic name in the partition keys, and we need to filter the map by just those entries whose key matches the new pattern. We ignore the container’s getStream().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant