Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An empty partition can block data ingestion from Kafka/Kinesis #7600

Open
jihoonson opened this issue May 6, 2019 · 1 comment
Open

An empty partition can block data ingestion from Kafka/Kinesis #7600

jihoonson opened this issue May 6, 2019 · 1 comment

Comments

@jihoonson
Copy link
Contributor

Affected Version

All version since 0.10

Description

When the supervisor is restarted or reset, it first finds the valid offsets to continue reading from metadata storage. If there's no offsets in metadata storage, then it fetches the latest/earliest offset from Kafka/Kinesis per partition. It's implemented here.

  private ImmutableMap<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup(
      int groupId
  )
  {
    ImmutableMap.Builder<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
    for (Entry<PartitionIdType, SequenceOffsetType> entry : partitionGroups.get(groupId).entrySet()) {
      PartitionIdType partition = entry.getKey();
      SequenceOffsetType sequence = entry.getValue();

      if (!getNotSetMarker().equals(sequence)) {
        // if we are given a startingOffset (set by a previous task group which is pending completion) then use it
        if (!isEndOfShard(sequence)) {
          builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()));
        }
      } else {
        // if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then
        // get the sequence from metadata storage (if available) or Kafka/Kinesis (otherwise)
        OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = getOffsetFromStorageForPartition(partition);

        if (offsetFromStorage != null) {
          builder.put(partition, offsetFromStorage);
        }
      }
    }
    return builder.build();
  }

If the supervisor sees an exception while fetching the offset from Kafka/Kinesis for some reason (e.g., timeout because of an empty partition), then this method would return immediately and handling the runNotice would fail because of the exception. This may block the entire data ingestion until any data is ingested to all partitions.

I think the supervisor should be able to skip some partitions if it sees some error while getting the offset and continue ingestion for other partitions.

@shivtools
Copy link
Contributor

shivtools commented May 13, 2019

Hey @jihoonson, thanks for opening creating this issue! I've put up #7648 - let me know if this is what you had in mind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants