Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpointing failure after taskDuration in Kafka/Kinesis indexing service #7575

Open
jihoonson opened this issue Apr 30, 2019 · 0 comments
Open

Comments

@jihoonson
Copy link
Contributor

Affected Version

All versions since incremental handoff was introduced

Description

Checkpointing can be initiated by both the supervisor and tasks. Tasks can initiate checkpointing whenever it wants to publish segments. The supervisor initiates checkpointing when the task run time has reached to taskDuration. When the supervisor initiates checkpointing, the task changes its status to publishing and will stop once it publishes all segments.

The supervisor calls checkTaskDuration() to start checkpointing (https://github.com/apache/incubator-druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1912-L1969).

Here is some code snippet.

  private void checkTaskDuration() throws ExecutionException, InterruptedException, TimeoutException
  {
    final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
...
    for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
...
      if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
        log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
        futureGroupIds.add(groupId);
        futures.add(checkpointTaskGroup(group, true));
      }
    }

    List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures)
                                                                    .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
    for (int j = 0; j < results.size(); j++) {
...
      activelyReadingTaskGroups.remove(groupId);
    }
  }

The issue is checkpointTaskGroup(group, true) can be called more than one time for the same taskGroup if Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS) fails because of timeout. If it is timed out, some future might fail, but others might succeed. However, activelyReadingTaskGroups is updated only when futures.get() is returned successfully. As a result, when checkTaskDuration is called in the next runNotice, it can start duplicate checkpointing for some taskGroups because they are still in activelyReadingTaskGroups which results in failing all tasks in those taskGroups because the previous checkpointing succeeded and they are now in publishing status.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant