-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PCollection data sampling for Java SDK harness #25064 #25354
Conversation
Can you address the checkstyle failures?
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
* Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to | ||
* every PCollection. | ||
*/ | ||
public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you want to insert this as a separate transform in the graph instead of modifying
Line 174 in d20d0b0
public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollectionId) { |
Also typically we would ask the runner to do these PBD modifications itself so it could selectively choose which PCollections to sample and which not to. This would also remove the need for the SDK to check for an experiment, allow the runner to choose which to sample without needing to pass in PCollection ids (the PBD ids would be enough). There is a cost though since transforms impose msec and state transition book keeping overhead which is not insignificant and you'll make the current single consumer hot-path in PCollectionConsumerRegistry always use the multi-consumer variant which has additional overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is to allow the SDK to define how it wants to implement sampling. This simplifies the protocol implementation. The runner PBD modification suggestion implies that an SDK needs to understand a new well-known transform in order to also implement this sampling protocol. Not impossible, but given that the original requirements for sampling was to sample everything given the pipeline option, I thought this would add complexity.
I modified the graph in this way b/c this was the simplest way to add future-proofed data sampling. This cuts down on the amount of code changes now for every receiver type and for the future. I hadn't thought about this forcing to use the multi-consumer variant.
Would an alternate suggestion be to create a new ExecutionState implementation (or ExecutionStateImpl subclass) that also adds DataSampling? That way the implementation can switch when it sees that data sampling is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that you would update PCollectionConsumerRegistry#MetricTrackingFnDataReceiver
and PCollectionConsumerRegistry#MultiplexingMetricTrackingFnDataReceiver
and add an if(sampling) { doSampling }
to them. The if
should be really cheap since the value should always be false or true so the JVMs ability to predict the branch should be very high. Alternatively you could also create a sub-class of the three receivers there to add sampling as necessary.
You can validate the if
approach by running the JMH benchmark:
./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, that makes sense. I changed to the suggested implementation and ran the benchmark:
Benchmark with this PR (data sampling not enabled)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1640.020 ± 236.240 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 3,5,main]) completed. Took 5 mins 9.573 secs.
Benchmark with this PR (data sampling enabled)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1094.943 ± 99.748 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 9.603 secs.
Benchmark at upstream/master
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1808.108 ± 30.062 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 5,5,main]) completed. Took 5 mins 9.064 secs.
Clearly, there's some environmental factors affecting the benchmarking. But, I think it can at least be said this PR with data sampling disabled won't completely hinder performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redone with atomics:
yes data sampling (no atomics)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1150.680 ± 25.650 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 7,5,main]) completed. Took 5 mins 9.453 secs.
yes data sampling (with atomics)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1547.291 ± 50.887 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 6,5,main]) completed. Took 5 mins 8.804 secs.
no data sampling
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1709.427 ± 38.715 ops/s
:sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 8.804 secs.
At head
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1807.738 ± 15.295 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 2,5,main]) completed. Took 5 mins 8.921 secs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much better since the variation in the runs is smaller but 5% is still not small.
Hopefully marking the fields final will address that but if not we might want to go with the subclass route:
SamplingAndMetricTrackingFnDataReceiver extends MetricTrackingFnDataReceiver {
public void accept(WindowedValue<T> value) {
sample(value);
super.accept(value);
}
Or take a look at the profiling information. If you follow the instructions here to install cloud profiler locally
// Agent: (see https://cloud.google.com/profiler/docs/profiling-java#installing-profiler for instructions on how to install) |
and then authenticate with gcloud you can run (replacing the GCP project with one that you have access to) will upload the results to Google Cloud Profiler:
./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle -PgcpProject=...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem was that I was measuring the null hypothesis at upstream head, instead of origin head. I re-ran the benchmark and saw that the benchmark at origin is the same as data sampling enabled. Maybe there were some changes that resulted in some performance differences?
yes sampling (with atomics)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1584.137 ± 18.961 ops/s
:sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 11.907 secs.
no data sampling
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1699.238 ± 16.382 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 11.424 secs.
at upstream head
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1737.774 ± 34.448 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 8.815 secs.
at origin head
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1698.047 ± 17.290 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 10.494 secs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know and yes there were some performance optimizations that I added on these code paths recently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also ran with the profiling enabled but wasn't able to identify any more points for improvement.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Outdated
Show resolved
Hide resolved
Assigning reviewers. If you would like to opt out of this review, comment R: @lukecwik for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
.../java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
Outdated
Show resolved
Hide resolved
.../java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Outdated
Show resolved
Hide resolved
Can you rebase on top of master and then I can do the next deep review round? |
…trol/ProcessBundleHandler.java Co-authored-by: Lukasz Cwik <lcwik@google.com>
Co-authored-by: Lukasz Cwik <lcwik@google.com>
793f189
to
07d37ea
Compare
Rebased with master, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add tests for:
- PCollectionConsumerRegistryTest to show that non-null DataSampler causes sampling to happen
- DataSampler concurrency between adding a new output sampler for a new PCollection and taking samples
- OutputSampler concurrency between multiple samples being taken in parallel and samples being returned.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
Outdated
Show resolved
Hide resolved
@@ -349,6 +386,10 @@ public void accept(WindowedValue<T> input) throws Exception { | |||
// when we have window optimization. | |||
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder); | |||
|
|||
if (outputSampler != null) { | |||
outputSampler.sample(input.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you want to sample the value with the windowing information attached?
The runner would be responsible for pulling out the attributes that are being sampled that can be introspected and/or saved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do, but I don't want to alter the encoding of the sampled bytes when sending back to the runner. It's not impossible, but I've found that there are a lot of mines when changing encodings.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Show resolved
Hide resolved
Co-authored-by: Lukasz Cwik <lcwik@google.com>
Added requested tests, lmk if I can improve them. I don't have a lot of experience with making testing multi-threaded Java code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits for code and comments, just looking for improvements on the concurrency tests.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
Show resolved
Hide resolved
…ug/DataSampler.java Co-authored-by: Lukasz Cwik <lcwik@google.com>
…ug/DataSamplerTest.java Co-authored-by: Lukasz Cwik <lcwik@google.com>
…ug/DataSamplerTest.java Co-authored-by: Lukasz Cwik <lcwik@google.com>
…ug/DataSamplerTest.java Co-authored-by: Lukasz Cwik <lcwik@google.com>
Run Java_PVR_Flink_Docker PreCommit |
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
resampleIndex = 0; | ||
} | ||
|
||
ByteStringOutputStream stream = new ByteStringOutputStream(); | ||
for (T el : copiedBuffer) { | ||
for (int i = 0; i < bufferToSend.size(); i++) { | ||
int index = (sampleIndex + i) % bufferToSend.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The specification doesn't say anything about having these ordered in the response based upon oldest to newest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but this did make testing it easier. Without this, the elements are read out of order because old samples are overwritten in the array.
Java PreCommit passed, just GH UI failed to update in a timely fashion: https://ci-beam.apache.org/job/beam_PreCommit_Java_Phrase/5948/ |
…5354) PCollection data sampling for Java SDK harness apache#25064 This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest. Task apache#25064
…5354) PCollection data sampling for Java SDK harness apache#25064 This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest. Task apache#25064
…5354) PCollection data sampling for Java SDK harness apache#25064 This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest. Task apache#25064
This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest.
Task #25064
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.