Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: internal API to get distinct Workers from some Schedulers #5741

Merged
merged 1 commit into from
Nov 27, 2017

Conversation

akarnokd
Copy link
Member

This PR adds an internal interface SchedulerMultiWorkerSupport that allows retrieving multiple workers from a Scheduler that implements this interface.

The standard Scheduler.getWorker() can be invoked as many times as necessary, but specific implementations such as the computation() Scheduler and the ParallelScheduler is not guaranteed to return workers that are backed by distinct single-threaded thread pools.

This does not effect other scheduler types because:

  • they are single threaded (single()) or don't use threads at all (trampoline()) and
  • already hand out distinct workers (io(), newThread()).

Such worker reuse can happen when in a highly concurrent application, typical tasks are mixed with parallel tasks and both pull out workers from these Schedulers. If this is happens, it is possible there will be duplicate threads used by the parallel operations and thus not utilize the originally intended parallelism level.

By implementing this suggested interface, a batch-retrieval can be supported by the Schedulers and they can make sure the caller gets as many distinct thread-pool as possible. If more workers are requested than the Scheduler's parallelism, the workers are handed out in round-robin fashion similar to the standard createWorker().

Why a callback instead of returning an array?

  • even if both require an allocation to set up, there is no need to have all workers visible at once,
  • simply less memory usage, and
  • avoids looping twice: once for filling in the array and once for using the elements of the array.

@akarnokd akarnokd added this to the 2.2 milestone Nov 24, 2017
@codecov
Copy link

codecov bot commented Nov 24, 2017

Codecov Report

Merging #5741 into 2.x will decrease coverage by 0.11%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #5741      +/-   ##
============================================
- Coverage     96.29%   96.18%   -0.12%     
  Complexity     5833     5833              
============================================
  Files           634      634              
  Lines         41615    41639      +24     
  Branches       5761     5766       +5     
============================================
- Hits          40073    40050      -23     
- Misses          611      640      +29     
- Partials        931      949      +18
Impacted Files Coverage Δ Complexity Δ
...vex/internal/operators/parallel/ParallelRunOn.java 97.1% <100%> (-0.37%) 8 <2> (+2)
...ivex/internal/schedulers/ComputationScheduler.java 97.53% <100%> (+0.51%) 14 <1> (+1) ⬆️
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-5.98%) 2% <0%> (ø)
...nternal/operators/observable/ObservableCreate.java 92.3% <0%> (-5.13%) 2% <0%> (ø)
...ava/io/reactivex/processors/BehaviorProcessor.java 89.38% <0%> (-4.87%) 61% <0%> (ø)
...tivex/internal/observers/FutureSingleObserver.java 94.33% <0%> (-3.78%) 24% <0%> (-1%)
...rnal/operators/observable/ObservableObserveOn.java 97.95% <0%> (-2.05%) 3% <0%> (ø)
...ernal/operators/flowable/FlowableFlatMapMaybe.java 93.23% <0%> (-1.94%) 2% <0%> (ø)
...main/java/io/reactivex/subjects/ReplaySubject.java 96.25% <0%> (-1.77%) 50% <0%> (-1%)
...rnal/operators/flowable/FlowableFlatMapSingle.java 91.3% <0%> (-1.64%) 2% <0%> (ø)
... and 22 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e25be7c...a787346. Read the comment docs.

Copy link
Contributor

@artem-zinnatullin artem-zinnatullin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename PR to "Internal API to …"?


@Override
public void createWorkers(int number, WorkerCallback callback) {
int c = cores;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cores is final in this context, why copy it to local var?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With potential volatiles around, this won't re-read the field every time it is needed in the loop below.

* The callback interface for the {@link SchedulerMultiWorkerSupport#createWorkers(int, WorkerCallback)}
* method.
*/
interface WorkerCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkerCreatedCallback?

Copy link
Member Author

@akarnokd akarnokd Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While still it is an internal API, I don't think we need a more complicated naming.

* @param index the worker index, zero-based
* @param worker the worker instance
*/
void onWorker(int index, @NonNull Scheduler.Worker worker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onCreated?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for the index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ParallelRunOn, it let's index into the parent and subscribers arrays without the need for additional state, such as counting how many times onWorker was invoked.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, potentially reduces allocations for the user, thanks

@akarnokd akarnokd changed the title 2.x: API to get distinct Workers from some Schedulers 2.x: internal API to get distinct Workers from some Schedulers Nov 27, 2017
@artem-zinnatullin
Copy link
Contributor

👍

@akarnokd akarnokd merged commit d2fe631 into ReactiveX:2.x Nov 27, 2017
@akarnokd akarnokd deleted the SchedulerMultiWorker branch November 27, 2017 22:12
Copy link
Collaborator

@davidmoten davidmoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will be useful.

* @param index the worker index, zero-based
* @param worker the worker instance
*/
void onWorker(int index, @NonNull Scheduler.Worker worker);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for the index?

@davidmoten
Copy link
Collaborator

The PR has already been merged but for the record I approve too

@akarnokd
Copy link
Member Author

Thanks for the reviews @artem-zinnatullin @vanniktech & @davidmoten !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants