-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Try to fetch the status for an active task from memory #15724
Try to fetch the status for an active task from memory #15724
Conversation
@@ -143,7 +143,7 @@ public class KafkaSupervisorTest extends EasyMockSupport | |||
false, | |||
false | |||
); | |||
private static final String TOPIC_PREFIX = "testTopic"; | |||
private static final String TOPIC_PREFIX = "testTopi"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, that was accidental
Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size()); | ||
for (String taskId : taskIds) { | ||
Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId); | ||
final Optional<TaskStatus> optional; | ||
if (taskQueue.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task is not present in the taskQueue
, we should then check the taskStorageQueryAdapter
. It might be already complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskQueue#getStatus looks at the metadata store if the task is not running
public Optional<TaskStatus> getTaskStatus(final String taskId)
{
RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
return Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
} else {
return taskStorage.getStatus(taskId);
}
}
Would it be better to move this fallback to db logic out of the TaskQueue in the calling method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, I didn't realize that the fallback logic was in the TaskQueue
method. In that case this code is OK.
@@ -671,7 +672,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r | |||
// Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor | |||
// remembers that this task has completed. | |||
try { | |||
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId()); | |||
final Optional<TaskStatus> previousStatus = getTaskStatus(task.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This really should use the metadata store. The code block is only called when a task completes, and we need to check to make sure the metadata store has the correct status stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, added a comment as well
|
||
if (response.getStatus() != null) { | ||
return response.getStatus().getLocation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the same change in SpecificTaskServiceLocator
, to use taskStatuses
instead of taskStatus
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -894,7 +885,12 @@ public TaskLocation getTaskLocation(final String id) | |||
@Override | |||
public Optional<TaskStatus> getTaskStatus(String id) | |||
{ | |||
return taskStorage.getStatus(id); | |||
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task is not present in the taskQueue
, we should then check the taskStorageQueryAdapter
. It might be already complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size()); | ||
for (String taskId : taskIds) { | ||
Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId); | ||
final Optional<TaskStatus> optional; | ||
if (taskQueue.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, I didn't realize that the fallback logic was in the TaskQueue
method. In that case this code is OK.
The code looks good to me, but there are some failures due to code coverage. @AmatyaAvadhanula please see if it's straightforward to add coverage, and if it isn't please let us know. |
EasyMock.expect(taskQueue.getTaskStatus("task")) | ||
.andReturn(Optional.of(TaskStatus.running("task"))); | ||
EasyMock.replay(taskQueue); | ||
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); |
Check notice
Code scanning / CodeQL
Possible confusion of local and field Note test
testGetMultipleTaskStatuses_presentTaskQueue
taskMaster
workerTaskRunnerQueryAdapter | ||
); | ||
|
||
TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); |
Check notice
Code scanning / CodeQL
Possible confusion of local and field Note test
testGetMultipleTaskStatuses_absentTaskQueue
taskStorageQueryAdapter
EasyMock.expect(taskStorageQueryAdapter.getStatus("task")) | ||
.andReturn(Optional.of(TaskStatus.running("task"))); | ||
EasyMock.replay(taskStorageQueryAdapter); | ||
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); |
Check notice
Code scanning / CodeQL
Possible confusion of local and field Note test
testGetMultipleTaskStatuses_absentTaskQueue
taskMaster
Thank you for the reviews @gianm @abhishekagarwal87 |
) Bug: #15724 introduced a bug where a rolling upgrade would cause all task locations returned by the Overlord on an older version to be unknown. Fix: If the new API fails, fall back to single task status API which always returns a valid task location.
Utilize the in memory state of the Overlord for active tasks to reduce metadata calls while fetching their statuses
This PR has: