You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When the supervisor is restarted or reset, it first finds the valid offsets to continue reading from metadata storage. If there's no offsets in metadata storage, then it fetches the latest/earliest offset from Kafka/Kinesis per partition. It's implemented here.
privateImmutableMap<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup(
intgroupId
)
{
ImmutableMap.Builder<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
for (Entry<PartitionIdType, SequenceOffsetType> entry : partitionGroups.get(groupId).entrySet()) {
PartitionIdTypepartition = entry.getKey();
SequenceOffsetTypesequence = entry.getValue();
if (!getNotSetMarker().equals(sequence)) {
// if we are given a startingOffset (set by a previous task group which is pending completion) then use itif (!isEndOfShard(sequence)) {
builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()));
}
} else {
// if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then// get the sequence from metadata storage (if available) or Kafka/Kinesis (otherwise)OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = getOffsetFromStorageForPartition(partition);
if (offsetFromStorage != null) {
builder.put(partition, offsetFromStorage);
}
}
}
returnbuilder.build();
}
If the supervisor sees an exception while fetching the offset from Kafka/Kinesis for some reason (e.g., timeout because of an empty partition), then this method would return immediately and handling the runNotice would fail because of the exception. This may block the entire data ingestion until any data is ingested to all partitions.
I think the supervisor should be able to skip some partitions if it sees some error while getting the offset and continue ingestion for other partitions.
The text was updated successfully, but these errors were encountered:
Affected Version
All version since 0.10
Description
When the supervisor is restarted or reset, it first finds the valid offsets to continue reading from metadata storage. If there's no offsets in metadata storage, then it fetches the latest/earliest offset from Kafka/Kinesis per partition. It's implemented here.
If the supervisor sees an exception while fetching the offset from Kafka/Kinesis for some reason (e.g., timeout because of an empty partition), then this method would return immediately and handling the runNotice would fail because of the exception. This may block the entire data ingestion until any data is ingested to all partitions.
I think the supervisor should be able to skip some partitions if it sees some error while getting the offset and continue ingestion for other partitions.
The text was updated successfully, but these errors were encountered: