Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prohibit Futures.addCallback(Future, Callback) #8809

Closed
wants to merge 4 commits into from

Conversation

zhenxiao
Copy link
Contributor

@zhenxiao zhenxiao commented Nov 2, 2019

@leventov
Copy link
Member

leventov commented Nov 2, 2019

Mechanically adding Execs.directExecutor() doesn't solve the problem. You should actually thoughtfully check every case and verify that Execs.directExecutor() is an acceptable option in each of them. A case should meet at least one of three criteria, as described in https://github.com/code-review-checklists/java-concurrency#cf-beware-non-async:

  1. The callback is lightweight and non-blocking (as is the case in your PR In DirectDruidClient, don't run Future cancellation listener in HTTP library executor #8700); or
  2. The callback is added from the same executor as the future is completed; or
  3. The callback attachment is preceded with if (future.isDone()) check.

Then, the specific criterion should be identified in a comment, similarly to what I asked you to do here: #8700 (comment).

Otherwise (if the case meets neither of these criteria), directExecutor() should not be used.

@leventov
Copy link
Member

leventov commented Nov 2, 2019

A forbidden-apis entry should also be added. See an example here: 15fbf59#diff-4cfb6baf06c34e537a8666d9a0f3aebd

@zhenxiao
Copy link
Contributor Author

zhenxiao commented Nov 6, 2019

thank you, @leventov
comments addressed
did a force push, put this work on top of #8700, so that compiles with forbidden-apis
only the 2nd commit for review

@@ -250,7 +254,8 @@ public void onFailure(Throwable t)
LOG.error(t, "Error while running a task for subTaskSpec[%s]", spec);
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t));
}
}
},
blockingQueueHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskCompleteEvents.offer(completeEvent); -- BlockingQueue.offer() is actually non-blocking

Copy link
Member

@leventov leventov Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction - BlockingQueue.offer() still needs to enter a critical section, but it may be considered "mostly non-blocking", so directExecutor() should still be OK

@@ -61,15 +61,19 @@ public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too much indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is the indentation suggested when compile the code. Anyway, let me refactor to make it more readable

}

@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

@@ -256,7 +256,9 @@ public void onFailure(Throwable throwable)
waitingForMonitor.notifyAll();
}
}
}
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized (waitingForMonitor) is blocking. You may argue that this lock is (almost) free, for some reasons, so then the callback should still be considered "mostly non-blocking" and OK to schedule on directExecutor(), but in any case it demands a more elaborate comment

@@ -556,7 +559,8 @@ private void handleStatus(final TaskStatus status)
.emit();
}
}
}
},
statusHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-obvious from this point in code to see that attachCallbacks() is called from a different thread pool than statusFuture is completed.

There are options to improve:

  • Propagate Executor argument and pass statusHandler only in manage(), and, additionally, amend the javadoc comment for manage() noting that it is run from {@link #managerExec}.
  • Just add a more elaborate comment right here.

In either case, it will be non-obvious for readers why different executors demand a special executor. So the comment should look something like that:
"Using dedicated statusHandler executor instead of directExecutor() because the callback's onSuccess() is not trivial and because the statusFuture is completed in some incapsulated thread pool in TaskRunner; directExecutor() may create operational instability and subtle dependency between components here. See https://github.com/code-review-checklists/java-concurrency#cf-beware-non-async for details."

Something similar should be done in all other places where you use a custom executor instead of directExecutor().

@@ -1008,7 +1014,8 @@ public void onFailure(Throwable t)
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata);
handoffFuture.setException(t);
}
}
},
sequencePersistExecutor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I withdraw from analyzing execution flows and thread pool relationships here. @jihoonson could you please vet this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will take a look soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be reviewed by maintainers of SeekableStreamIndexTaskRunner.

@@ -251,7 +251,9 @@ public void onFailure(Throwable t)
{
submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
}
}
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a javadoc comment to submitNoticeToExec() like "This method is mostly non-blocking because exec is a ThreadPoolExecutor with unbounded queue used by default."

Correspondingly, this comment also should say "mostly non-blocking", not just "non-blocking".

@@ -192,7 +196,8 @@ public void onFailure(Throwable th)
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some weird asynchrony going on in this method (at least it looks so), so it demands more comments

@@ -205,7 +209,8 @@ public void onFailure(Throwable th)
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. This code looks similar to TaskManagementResource. Maybe the common logic should be extracted. Or, there should be comments to both of them noting that these classes are written in a similar way so changes, fixes, refactorings should be applied to both classes at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract and refactor TaskManagementResource, SegmentListerResource needs more work, will file separate PR to do it
in this PR, will add comments to both classes at the same time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but the comments that you left right here don't make anything clearer. It's ok to omit them in this PR and instead write better comments as part of that PR that you planned, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@@ -560,6 +560,7 @@ private void handleStatus(final TaskStatus status)
}
}
},
//Using dedicated statusHandler executor instead of directExecutor() because the callback's onSuccess() is not trivial and because the statusFuture is completed in some incapsulated thread pool in TaskRunner; directExecutor() may create operational instability and subtle dependency between components here. See https://github.com/code-review-checklists/java-concurrency#cf-beware-non-async for details
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line longer than 120 columns

@@ -177,6 +177,9 @@ public void stop() throws Exception
return completedTasks;
}

/**
* This method is mostly non-blocking because exec is a ThreadPoolExecutor with unbounded queue used by default.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make exec and ThreadPoolExecutor javadoc links

@@ -257,7 +257,7 @@ public void onFailure(Throwable throwable)
}
}
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
// The callback is mostly non-blocking and quick, so it's OK to schedule it using directExecutor()
Copy link
Member

@leventov leventov Nov 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my comment, I wrote "you may argue that... this code is mostly non-blocking", I didn't state that (and I actually don't know). I. e. the fact that the synchronized block in onSuccess() "is mostly non-blocking" requires some proof, expressed in a comment. There is not enough information to make such proof (or disproof) just in this code excerpt in Github's interface. A wider context (specifically, the code which synchronizes on waitingForMonitor and waits) should be considered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. the synchronization of waitingForMonitor involves a while loop. Will use dedicated thread

@@ -205,7 +209,8 @@ public void onFailure(Throwable th)
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but the comments that you left right here don't make anything clearer. It's ok to omit them in this PR and instead write better comments as part of that PR that you planned, though.

@zhenxiao
Copy link
Contributor Author

thank you, @leventov
get comments addressed

@zhenxiao
Copy link
Contributor Author

zhenxiao commented Dec 3, 2019

Hi @leventov I rebased this PR to resolve code conflicts. Are you free to take a look?

@@ -175,6 +175,7 @@

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
private final ExecutorService monitorSyncHandler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra service must be explicitly shutdown in stop()/close(), see https://github.com/code-review-checklists/java-concurrency#explicit-shutdown. Applies to other classes in this PR as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, fixed.

@@ -483,7 +485,8 @@ public void onFailure(Throwable e)
log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size());
errorHandler.apply(e);
}
}
},
scheduledExecutor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be reviewed by maintainers of AppenderatorPlumber

@@ -1008,7 +1014,8 @@ public void onFailure(Throwable t)
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata);
handoffFuture.setException(t);
}
}
},
sequencePersistExecutor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be reviewed by maintainers of SeekableStreamIndexTaskRunner.

@@ -350,6 +350,7 @@ public void stop()
return;
}
try {
monitorSyncHandler.shutdown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two things must be closed using the closer below

@@ -219,6 +219,7 @@ public void stop()
tasks.clear();
taskFutures.clear();
active = false;
statusHandler.shutdownNow();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please shutdown these three Executors using a Closer

@@ -219,6 +221,7 @@
private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new ArrayList<>();

private final LockGranularity lockGranularityToUse;
private final ExecutorService sequencePersistExecutor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In classes that don't have close()/stop() methods yet, like SeekableStreamIndexTaskRunner, they should be introduced if you want to manage an executor.

If the class is pretty ephemeral (e. g. many instances of this class routinely start and stop), it should be strongly preferred to create an executor one level up the stack and reuse it for all instances of the class (or all instances of the class within a certain domain e. g. all SeekableStreamIndexTaskRunners associated with a single datasource, or something like that).

Note again, I don't actually tell you that this class SeekableStreamIndexTaskRunner falls into this category - please figure it out yourself, or consult the maintainers of this subsystem.

@github-actions
Copy link

github-actions bot commented Aug 3, 2023

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 3, 2023
@github-actions
Copy link

github-actions bot commented Sep 1, 2023

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Sep 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Prohibit Futures.addCallback(Future, Callback) (i. e. using directExecutor() implicitly) using forbidden-apis
3 participants