-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prohibit Futures.addCallback(Future, Callback) #8809
Conversation
Mechanically adding
Then, the specific criterion should be identified in a comment, similarly to what I asked you to do here: #8700 (comment). Otherwise (if the case meets neither of these criteria), |
A forbidden-apis entry should also be added. See an example here: 15fbf59#diff-4cfb6baf06c34e537a8666d9a0f3aebd |
@@ -250,7 +254,8 @@ public void onFailure(Throwable t) | |||
LOG.error(t, "Error while running a task for subTaskSpec[%s]", spec); | |||
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t)); | |||
} | |||
} | |||
}, | |||
blockingQueueHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskCompleteEvents.offer(completeEvent);
-- BlockingQueue.offer()
is actually non-blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction - BlockingQueue.offer()
still needs to enter a critical section, but it may be considered "mostly non-blocking", so directExecutor() should still be OK
@@ -61,15 +61,19 @@ public void onFailure(Throwable t) | |||
{ | |||
finalFuture.setException(t); | |||
} | |||
}); | |||
}, | |||
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too much indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this is the indentation suggested when compile the code. Anyway, let me refactor to make it more readable
} | ||
|
||
@Override | ||
public void onFailure(Throwable t) | ||
{ | ||
finalFuture.setException(t); | ||
} | ||
}); | ||
}, | ||
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
@@ -256,7 +256,9 @@ public void onFailure(Throwable throwable) | |||
waitingForMonitor.notifyAll(); | |||
} | |||
} | |||
} | |||
}, | |||
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized (waitingForMonitor)
is blocking. You may argue that this lock is (almost) free, for some reasons, so then the callback should still be considered "mostly non-blocking" and OK to schedule on directExecutor(), but in any case it demands a more elaborate comment
@@ -556,7 +559,8 @@ private void handleStatus(final TaskStatus status) | |||
.emit(); | |||
} | |||
} | |||
} | |||
}, | |||
statusHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's non-obvious from this point in code to see that attachCallbacks()
is called from a different thread pool than statusFuture
is completed.
There are options to improve:
- Propagate
Executor
argument and passstatusHandler
only inmanage()
, and, additionally, amend the javadoc comment formanage()
noting that it is run from{@link #managerExec}
. - Just add a more elaborate comment right here.
In either case, it will be non-obvious for readers why different executors demand a special executor. So the comment should look something like that:
"Using dedicated statusHandler executor instead of directExecutor() because the callback's onSuccess() is not trivial and because the statusFuture is completed in some incapsulated thread pool in TaskRunner; directExecutor() may create operational instability and subtle dependency between components here. See https://github.com/code-review-checklists/java-concurrency#cf-beware-non-async for details."
Something similar should be done in all other places where you use a custom executor instead of directExecutor().
@@ -1008,7 +1014,8 @@ public void onFailure(Throwable t) | |||
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata); | |||
handoffFuture.setException(t); | |||
} | |||
} | |||
}, | |||
sequencePersistExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I withdraw from analyzing execution flows and thread pool relationships here. @jihoonson could you please vet this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will take a look soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be reviewed by maintainers of SeekableStreamIndexTaskRunner
.
@@ -251,7 +251,9 @@ public void onFailure(Throwable t) | |||
{ | |||
submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId()))); | |||
} | |||
} | |||
}, | |||
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a javadoc comment to submitNoticeToExec()
like "This method is mostly non-blocking because exec is a ThreadPoolExecutor with unbounded queue used by default."
Correspondingly, this comment also should say "mostly non-blocking", not just "non-blocking".
@@ -192,7 +196,8 @@ public void onFailure(Throwable th) | |||
log.debug(ex, "Request timed out or closed already."); | |||
} | |||
} | |||
} | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some weird asynchrony going on in this method (at least it looks so), so it demands more comments
@@ -205,7 +209,8 @@ public void onFailure(Throwable th) | |||
log.debug(ex, "Request timed out or closed already."); | |||
} | |||
} | |||
} | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same. This code looks similar to TaskManagementResource
. Maybe the common logic should be extracted. Or, there should be comments to both of them noting that these classes are written in a similar way so changes, fixes, refactorings should be applied to both classes at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract and refactor TaskManagementResource, SegmentListerResource needs more work, will file separate PR to do it
in this PR, will add comments to both classes at the same time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but the comments that you left right here don't make anything clearer. It's ok to omit them in this PR and instead write better comments as part of that PR that you planned, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -560,6 +560,7 @@ private void handleStatus(final TaskStatus status) | |||
} | |||
} | |||
}, | |||
//Using dedicated statusHandler executor instead of directExecutor() because the callback's onSuccess() is not trivial and because the statusFuture is completed in some incapsulated thread pool in TaskRunner; directExecutor() may create operational instability and subtle dependency between components here. See https://github.com/code-review-checklists/java-concurrency#cf-beware-non-async for details |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 columns
@@ -177,6 +177,9 @@ public void stop() throws Exception | |||
return completedTasks; | |||
} | |||
|
|||
/** | |||
* This method is mostly non-blocking because exec is a ThreadPoolExecutor with unbounded queue used by default. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make exec
and ThreadPoolExecutor
javadoc links
@@ -257,7 +257,7 @@ public void onFailure(Throwable throwable) | |||
} | |||
} | |||
}, | |||
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor() | |||
// The callback is mostly non-blocking and quick, so it's OK to schedule it using directExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my comment, I wrote "you may argue that... this code is mostly non-blocking", I didn't state that (and I actually don't know). I. e. the fact that the synchronized block in onSuccess()
"is mostly non-blocking" requires some proof, expressed in a comment. There is not enough information to make such proof (or disproof) just in this code excerpt in Github's interface. A wider context (specifically, the code which synchronizes on waitingForMonitor
and waits) should be considered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it. the synchronization of waitingForMonitor involves a while loop. Will use dedicated thread
@@ -205,7 +209,8 @@ public void onFailure(Throwable th) | |||
log.debug(ex, "Request timed out or closed already."); | |||
} | |||
} | |||
} | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but the comments that you left right here don't make anything clearer. It's ok to omit them in this PR and instead write better comments as part of that PR that you planned, though.
thank you, @leventov |
Hi @leventov I rebased this PR to resolve code conflicts. Are you free to take a look? |
@@ -175,6 +175,7 @@ | |||
|
|||
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>(); | |||
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy; | |||
private final ExecutorService monitorSyncHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This extra service must be explicitly shutdown in stop()
/close()
, see https://github.com/code-review-checklists/java-concurrency#explicit-shutdown. Applies to other classes in this PR as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you, fixed.
@@ -483,7 +485,8 @@ public void onFailure(Throwable e) | |||
log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size()); | |||
errorHandler.apply(e); | |||
} | |||
} | |||
}, | |||
scheduledExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be reviewed by maintainers of AppenderatorPlumber
@@ -1008,7 +1014,8 @@ public void onFailure(Throwable t) | |||
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata); | |||
handoffFuture.setException(t); | |||
} | |||
} | |||
}, | |||
sequencePersistExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be reviewed by maintainers of SeekableStreamIndexTaskRunner
.
@@ -350,6 +350,7 @@ public void stop() | |||
return; | |||
} | |||
try { | |||
monitorSyncHandler.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two things must be closed using the closer
below
@@ -219,6 +219,7 @@ public void stop() | |||
tasks.clear(); | |||
taskFutures.clear(); | |||
active = false; | |||
statusHandler.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please shutdown these three Executors using a Closer
@@ -219,6 +221,7 @@ | |||
private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new ArrayList<>(); | |||
|
|||
private final LockGranularity lockGranularityToUse; | |||
private final ExecutorService sequencePersistExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In classes that don't have close()
/stop()
methods yet, like SeekableStreamIndexTaskRunner
, they should be introduced if you want to manage an executor.
If the class is pretty ephemeral (e. g. many instances of this class routinely start and stop), it should be strongly preferred to create an executor one level up the stack and reuse it for all instances of the class (or all instances of the class within a certain domain e. g. all SeekableStreamIndexTaskRunner
s associated with a single datasource, or something like that).
Note again, I don't actually tell you that this class SeekableStreamIndexTaskRunner
falls into this category - please figure it out yourself, or consult the maintainers of this subsystem.
This pull request has been marked as stale due to 60 days of inactivity. |
This pull request/issue has been closed due to lack of activity. If you think that |
Fixes #8756
@leventov