You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
All versions since incremental handoff was introduced
Description
Checkpointing can be initiated by both the supervisor and tasks. Tasks can initiate checkpointing whenever it wants to publish segments. The supervisor initiates checkpointing when the task run time has reached to taskDuration. When the supervisor initiates checkpointing, the task changes its status to publishing and will stop once it publishes all segments.
privatevoidcheckTaskDuration() throwsExecutionException, InterruptedException, TimeoutException
{
finalList<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = newArrayList<>();
...
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
...
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
}
}
List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (intj = 0; j < results.size(); j++) {
...
activelyReadingTaskGroups.remove(groupId);
}
}
The issue is checkpointTaskGroup(group, true) can be called more than one time for the same taskGroup if Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS) fails because of timeout. If it is timed out, some future might fail, but others might succeed. However, activelyReadingTaskGroups is updated only when futures.get() is returned successfully. As a result, when checkTaskDuration is called in the next runNotice, it can start duplicate checkpointing for some taskGroups because they are still in activelyReadingTaskGroups which results in failing all tasks in those taskGroups because the previous checkpointing succeeded and they are now in publishing status.
The text was updated successfully, but these errors were encountered:
Affected Version
All versions since incremental handoff was introduced
Description
Checkpointing can be initiated by both the supervisor and tasks. Tasks can initiate checkpointing whenever it wants to publish segments. The supervisor initiates checkpointing when the task run time has reached to
taskDuration
. When the supervisor initiates checkpointing, the task changes its status topublishing
and will stop once it publishes all segments.The supervisor calls
checkTaskDuration()
to start checkpointing (https://github.com/apache/incubator-druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1912-L1969).Here is some code snippet.
The issue is
checkpointTaskGroup(group, true)
can be called more than one time for the same taskGroup ifFutures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS)
fails because of timeout. If it is timed out, some future might fail, but others might succeed. However,activelyReadingTaskGroups
is updated only whenfutures.get()
is returned successfully. As a result, whencheckTaskDuration
is called in the next runNotice, it can start duplicate checkpointing for some taskGroups because they are still inactivelyReadingTaskGroups
which results in failing all tasks in those taskGroups because the previous checkpointing succeeded and they are now inpublishing
status.The text was updated successfully, but these errors were encountered: