Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to fetch the status for an active task from memory #15724

Merged

Conversation

AmatyaAvadhanula
Copy link
Contributor

Utilize the in memory state of the Overlord for active tasks to reduce metadata calls while fetching their statuses

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jan 19, 2024
@AmatyaAvadhanula AmatyaAvadhanula marked this pull request as ready for review January 19, 2024 13:12
@@ -143,7 +143,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
false,
false
);
private static final String TOPIC_PREFIX = "testTopic";
private static final String TOPIC_PREFIX = "testTopi";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, that was accidental

Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) {
Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId);
final Optional<TaskStatus> optional;
if (taskQueue.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task is not present in the taskQueue, we should then check the taskStorageQueryAdapter. It might be already complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskQueue#getStatus looks at the metadata store if the task is not running

  public Optional<TaskStatus> getTaskStatus(final String taskId)
  {
    RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
    if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
      return Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
    } else {
      return taskStorage.getStatus(taskId);
    }
  }

Would it be better to move this fallback to db logic out of the TaskQueue in the calling method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, I didn't realize that the fallback logic was in the TaskQueue method. In that case this code is OK.

@@ -671,7 +672,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r
// Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor
// remembers that this task has completed.
try {
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
final Optional<TaskStatus> previousStatus = getTaskStatus(task.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really should use the metadata store. The code block is only called when a task completes, and we need to check to make sure the metadata store has the correct status stored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added a comment as well


if (response.getStatus() != null) {
return response.getStatus().getLocation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the same change in SpecificTaskServiceLocator, to use taskStatuses instead of taskStatus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -894,7 +885,12 @@ public TaskLocation getTaskLocation(final String id)
@Override
public Optional<TaskStatus> getTaskStatus(String id)
{
return taskStorage.getStatus(id);
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task is not present in the taskQueue, we should then check the taskStorageQueryAdapter. It might be already complete.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) {
Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId);
final Optional<TaskStatus> optional;
if (taskQueue.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, I didn't realize that the fallback logic was in the TaskQueue method. In that case this code is OK.

@gianm
Copy link
Contributor

gianm commented Feb 24, 2024

The code looks good to me, but there are some failures due to code coverage. @AmatyaAvadhanula please see if it's straightforward to add coverage, and if it isn't please let us know.

EasyMock.expect(taskQueue.getTaskStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.replay(taskQueue);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);

Check notice

Code scanning / CodeQL

Possible confusion of local and field Note test

Confusing name: method
testGetMultipleTaskStatuses_presentTaskQueue
also refers to field
taskMaster
(without qualifying it with 'this').
workerTaskRunnerQueryAdapter
);

TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);

Check notice

Code scanning / CodeQL

Possible confusion of local and field Note test

Confusing name: method
testGetMultipleTaskStatuses_absentTaskQueue
also refers to field
taskStorageQueryAdapter
(without qualifying it with 'this').
EasyMock.expect(taskStorageQueryAdapter.getStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.replay(taskStorageQueryAdapter);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);

Check notice

Code scanning / CodeQL

Possible confusion of local and field Note test

Confusing name: method
testGetMultipleTaskStatuses_absentTaskQueue
also refers to field
taskMaster
(without qualifying it with 'this').
@AmatyaAvadhanula
Copy link
Contributor Author

Thank you for the reviews @gianm @abhishekagarwal87
I've fixed the coverage in one of the modules, but it's not straightforward in the other one.

@AmatyaAvadhanula AmatyaAvadhanula merged commit e2b7289 into apache:master Feb 26, 2024
77 of 83 checks passed
@AmatyaAvadhanula AmatyaAvadhanula changed the title Try to fetch the task status for an active from memory Try to fetch the status for an active task from memory Mar 19, 2024
@kfaraz kfaraz deleted the optimize_task_status_calls branch March 28, 2024 08:09
kfaraz pushed a commit that referenced this pull request Apr 16, 2024
)

Bug:
#15724 introduced a bug where a rolling upgrade would cause all task locations
returned by the Overlord on an older version to be unknown.

Fix:
If the new API fails, fall back to single task status API which always returns a valid task location.
@adarshsanjeev adarshsanjeev added this to the 30.0.0 milestone May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Streaming Ingestion
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants