You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In SeekableStreamSupervisor::getOffsetsFromMetadataStorage(), we do:
if (!ioConfig.getStream().equals(partitions.getStream())) {
log.warn(
"Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences",
partitions.getStream(),
ioConfig.getStream()
);
return Collections.emptyMap();
}
This code is inherited by the Kafka supervisor. In this, ioConfig is the ingestion spec, and partitions is the stored checkpoints for this datasource from the metadata store. This check sees if the ioConfig.getStream() is not the same string as partitions.getStream(), and if isn’t, we ignore stored offsets. This interacts awkwardly with Kafka’s multi-topic support:
ioConfig.getStream() is either topic (name) or topicPattern (pattern), whichever is set. (It’s an error for both to be set.) Therefore, it represents either a single, static topic name or a regular expression pattern.
partitions.getStream() is the ioConfig.getStream() that was used for the last streaming ingestion job on this datasource. As above, this means it can be a name or a pattern.
As a result, offsets get ignored for some situations that may cause inadvertent data duplication:
Converting from a topic name to a pattern, even if the pattern matches the previous name. For example, converting from topic: vehicle to topicPattern: vehicle.*.
The inverse situation, converting from a topic pattern to a name, even if the name matches the previous pattern. For example, converting from topicPattern: topicName.* to topic: topicName.
Changing the pattern where the set of topics selected by the old pattern and the new pattern overlap. For example, changing from topicPattern: foo-.* to topicPattern: foo-x-.* when there are topics like foo-1, foo-x-1, and foo-x-2 would ignore the stored offsets for foo-x-1 and foo-x-2, even though they matched both before and after.
We can improve this for Kafka multi-topic by allowing Kafka to override this method (or some protected/public method it calls). We have to handle:
Topic name → pattern – The checkpoints don’t have a topic name in the partition keys, so if the partitions container’s getStream() matches the new pattern, we need to update the in-memory map to populate this field in the partition key with the value from getStream().
Pattern → topic name – The checkpoints have a topic name in the partition keys, and we need to filter the map by just those entries where the partition key’s topic name exactly matches the new topic name. We ignore the container’s getStream().
Pattern → pattern – The checkpoints have a topic name in the partition keys, and we need to filter the map by just those entries whose key matches the new pattern. We ignore the container’s getStream().
The text was updated successfully, but these errors were encountered:
In SeekableStreamSupervisor::getOffsetsFromMetadataStorage(), we do:
This code is inherited by the Kafka supervisor. In this, ioConfig is the ingestion spec, and partitions is the stored checkpoints for this datasource from the metadata store. This check sees if the ioConfig.getStream() is not the same string as partitions.getStream(), and if isn’t, we ignore stored offsets. This interacts awkwardly with Kafka’s multi-topic support:
ioConfig.getStream() is either topic (name) or topicPattern (pattern), whichever is set. (It’s an error for both to be set.) Therefore, it represents either a single, static topic name or a regular expression pattern.
partitions.getStream() is the ioConfig.getStream() that was used for the last streaming ingestion job on this datasource. As above, this means it can be a name or a pattern.
As a result, offsets get ignored for some situations that may cause inadvertent data duplication:
Converting from a topic name to a pattern, even if the pattern matches the previous name. For example, converting from topic: vehicle to topicPattern: vehicle.*.
The inverse situation, converting from a topic pattern to a name, even if the name matches the previous pattern. For example, converting from topicPattern:
topicName.*
to topic:topicName
.Changing the pattern where the set of topics selected by the old pattern and the new pattern overlap. For example, changing from topicPattern: foo-.* to topicPattern: foo-x-.* when there are topics like foo-1, foo-x-1, and foo-x-2 would ignore the stored offsets for foo-x-1 and foo-x-2, even though they matched both before and after.
We can improve this for Kafka multi-topic by allowing Kafka to override this method (or some protected/public method it calls). We have to handle:
Topic name → pattern – The checkpoints don’t have a topic name in the partition keys, so if the partitions container’s getStream() matches the new pattern, we need to update the in-memory map to populate this field in the partition key with the value from getStream().
Pattern → topic name – The checkpoints have a topic name in the partition keys, and we need to filter the map by just those entries where the partition key’s topic name exactly matches the new topic name. We ignore the container’s getStream().
Pattern → pattern – The checkpoints have a topic name in the partition keys, and we need to filter the map by just those entries whose key matches the new pattern. We ignore the container’s getStream().
The text was updated successfully, but these errors were encountered: