Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PCollection data sampling for Java SDK harness #25064 #25354

Merged
merged 43 commits into from
Feb 22, 2023

Conversation

rohdesamuel
Copy link
Contributor

@rohdesamuel rohdesamuel commented Feb 6, 2023

This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest.

Task #25064


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@github-actions github-actions bot added the java label Feb 6, 2023
@lukecwik lukecwik changed the title Data sampling java PCollection data sampling for Java SDK harness #25064 Feb 7, 2023
@lukecwik lukecwik marked this pull request as ready for review February 7, 2023 20:25
@lukecwik
Copy link
Member

lukecwik commented Feb 7, 2023

Can you address the checkstyle failures?
see https://ci-beam.apache.org/job/beam_PreCommit_Spotless_Commit/25154/checkstyle/new/#issuesContent

Details | File | Package | Category | Type | Severity | Age
-- | -- | -- | -- | -- | -- | --
  | ProcessBundleHandler.java:23 | org.apache.beam.fn.harness.control | Imports | AvoidStarImportCheck | Error | 1
Using the '.*' form of import should be avoided - java.util.*.Since Checkstyle 3.0Checks that there are no import statements that use the * notation.Rationale: Importing all classes from a package or static members from a class leads to tight coupling between packages or classes and might lead to problems when a new version of a library introduces name clashes.
  | DataSamplingDescriptorModifier.java:1 | org.apache.beam.fn.harness.debug | Javadoc | JavadocPackageCheck | Error | 1
Missing package-info.java file.Since Checkstyle 5.0Checks that each Java package has a Javadoc file used for commenting. By default it only allows a package-info.java file, but can be configured to allow a package.html file.An error will be reported if both files exist as this is not allowed by the Javadoc tool.
  | ProcessBundleDescriptorModifier.java:40 | org.apache.beam.fn.harness | Naming | MethodNameCheck | Error | 1
Name 'ModifyProcessBundleDescriptor' must match pattern '^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*.Checks that method names conform to a format specified by the format property.Also, checks if a method name has the same name as the residing class. The default is false (it is not allowed). It is legal in Java to have method with the same name as a class. As long as a return type is specified it is a method and not a constructor which it could be easily confused as. Does not check-style the name of an overridden methods because the developer does not have a choice in renaming such methods.
  | DataSamplerTest.java:27 | org.apache.beam.fn.harness.debug | Imports | AvoidStarImportCheck | Error | 1
  | DataSamplingDescriptorModifierTest.java:41 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
Name 'PCOLLECTION_ID_A' must match pattern '^[a-z][a-zA-Z0-9]*.Checks that local final variable names conform to a format specified by the format property. A catch parameter and resources in try statements are considered to be a local, final variables.
  | DataSamplingDescriptorModifierTest.java:42 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
  | DataSamplingDescriptorModifierTest.java:43 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
  | DataSamplingDescriptorModifierTest.java:44 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
  | DataSamplingDescriptorModifierTest.java:71 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
  | DataSamplingDescriptorModifierTest.java:72 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
  | DataSamplingFnRunnerTest.java:22 | org.apache.beam.fn.harness.debug | Imports | AvoidStarImportCheck | Error | 1
  | DataSamplingFnRunnerTest.java:25 | org.apache.beam.fn.harness.debug | Imports | AvoidStarImportCheck | Error | 1

@github-actions
Copy link
Contributor

github-actions bot commented Feb 7, 2023

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

* Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
* every PCollection.
*/
public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you want to insert this as a separate transform in the graph instead of modifying

public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollectionId) {
to support passing elements to the OutputSampler or being an OutputSampler itself?

Also typically we would ask the runner to do these PBD modifications itself so it could selectively choose which PCollections to sample and which not to. This would also remove the need for the SDK to check for an experiment, allow the runner to choose which to sample without needing to pass in PCollection ids (the PBD ids would be enough). There is a cost though since transforms impose msec and state transition book keeping overhead which is not insignificant and you'll make the current single consumer hot-path in PCollectionConsumerRegistry always use the multi-consumer variant which has additional overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to allow the SDK to define how it wants to implement sampling. This simplifies the protocol implementation. The runner PBD modification suggestion implies that an SDK needs to understand a new well-known transform in order to also implement this sampling protocol. Not impossible, but given that the original requirements for sampling was to sample everything given the pipeline option, I thought this would add complexity.

I modified the graph in this way b/c this was the simplest way to add future-proofed data sampling. This cuts down on the amount of code changes now for every receiver type and for the future. I hadn't thought about this forcing to use the multi-consumer variant.

Would an alternate suggestion be to create a new ExecutionState implementation (or ExecutionStateImpl subclass) that also adds DataSampling? That way the implementation can switch when it sees that data sampling is enabled.

Copy link
Member

@lukecwik lukecwik Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that you would update PCollectionConsumerRegistry#MetricTrackingFnDataReceiver and PCollectionConsumerRegistry#MultiplexingMetricTrackingFnDataReceiver and add an if(sampling) { doSampling } to them. The if should be really cheap since the value should always be false or true so the JVMs ability to predict the branch should be very high. Alternatively you could also create a sub-class of the three receivers there to add sampling as necessary.

You can validate the if approach by running the JMH benchmark:

./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, that makes sense. I changed to the suggested implementation and ran the benchmark:

Benchmark with this PR (data sampling not enabled)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1640.020 ± 236.240 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 3,5,main]) completed. Took 5 mins 9.573 secs.

Benchmark with this PR (data sampling enabled)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1094.943 ± 99.748 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 9.603 secs.

Benchmark at upstream/master
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1808.108 ± 30.062 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 5,5,main]) completed. Took 5 mins 9.064 secs.

Clearly, there's some environmental factors affecting the benchmarking. But, I think it can at least be said this PR with data sampling disabled won't completely hinder performance.

Copy link
Contributor Author

@rohdesamuel rohdesamuel Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redone with atomics:

yes data sampling (no atomics)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1150.680 ± 25.650 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 7,5,main]) completed. Took 5 mins 9.453 secs.

yes data sampling (with atomics)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1547.291 ± 50.887 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 6,5,main]) completed. Took 5 mins 8.804 secs.

no data sampling
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1709.427 ± 38.715 ops/s
:sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 8.804 secs.

At head
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1807.738 ± 15.295 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 2,5,main]) completed. Took 5 mins 8.921 secs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better since the variation in the runs is smaller but 5% is still not small.

Hopefully marking the fields final will address that but if not we might want to go with the subclass route:

SamplingAndMetricTrackingFnDataReceiver extends MetricTrackingFnDataReceiver {

public void accept(WindowedValue<T> value) {
  sample(value);
  super.accept(value);
}

Or take a look at the profiling information. If you follow the instructions here to install cloud profiler locally

// Agent: (see https://cloud.google.com/profiler/docs/profiling-java#installing-profiler for instructions on how to install)

and then authenticate with gcloud you can run (replacing the GCP project with one that you have access to) will upload the results to Google Cloud Profiler:
./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle -PgcpProject=...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem was that I was measuring the null hypothesis at upstream head, instead of origin head. I re-ran the benchmark and saw that the benchmark at origin is the same as data sampling enabled. Maybe there were some changes that resulted in some performance differences?

yes sampling (with atomics)
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1584.137 ± 18.961 ops/s
:sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 11.907 secs.

no data sampling
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1699.238 ± 16.382 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 11.424 secs.

at upstream head
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1737.774 ± 34.448 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 8.815 secs.

at origin head
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 1698.047 ± 17.290 ops/s
:sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 10.494 secs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to know and yes there were some performance optimizations that I added on these code paths recently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also ran with the profiling enabled but wasn't able to identify any more points for improvement.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @lukecwik for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@lukecwik
Copy link
Member

Can you rebase on top of master and then I can do the next deep review round?

@rohdesamuel
Copy link
Contributor Author

Rebased with master, thanks!

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for:

  • PCollectionConsumerRegistryTest to show that non-null DataSampler causes sampling to happen
  • DataSampler concurrency between adding a new output sampler for a new PCollection and taking samples
  • OutputSampler concurrency between multiple samples being taken in parallel and samples being returned.

@@ -349,6 +386,10 @@ public void accept(WindowedValue<T> input) throws Exception {
// when we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);

if (outputSampler != null) {
outputSampler.sample(input.getValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to sample the value with the windowing information attached?

The runner would be responsible for pulling out the attributes that are being sampled that can be introspected and/or saved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do, but I don't want to alter the encoding of the sampled bytes when sending back to the runner. It's not impossible, but I've found that there are a lot of mines when changing encodings.

@rohdesamuel
Copy link
Contributor Author

Added requested tests, lmk if I can improve them. I don't have a lot of experience with making testing multi-threaded Java code.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits for code and comments, just looking for improvements on the concurrency tests.

rohdesamuel and others added 6 commits February 21, 2023 15:04
…ug/DataSampler.java

Co-authored-by: Lukasz Cwik <lcwik@google.com>
…ug/DataSamplerTest.java

Co-authored-by: Lukasz Cwik <lcwik@google.com>
…ug/DataSamplerTest.java

Co-authored-by: Lukasz Cwik <lcwik@google.com>
…ug/DataSamplerTest.java

Co-authored-by: Lukasz Cwik <lcwik@google.com>
@rohdesamuel
Copy link
Contributor Author

Run Java_PVR_Flink_Docker PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Java PreCommit

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

resampleIndex = 0;
}

ByteStringOutputStream stream = new ByteStringOutputStream();
for (T el : copiedBuffer) {
for (int i = 0; i < bufferToSend.size(); i++) {
int index = (sampleIndex + i) % bufferToSend.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specification doesn't say anything about having these ordered in the response based upon oldest to newest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but this did make testing it easier. Without this, the elements are read out of order because old samples are overwritten in the array.

@lukecwik
Copy link
Member

Java PreCommit passed, just GH UI failed to update in a timely fashion: https://ci-beam.apache.org/job/beam_PreCommit_Java_Phrase/5948/

@lukecwik lukecwik merged commit 83cfdb8 into apache:master Feb 22, 2023
lostluck pushed a commit to lostluck/beam that referenced this pull request Feb 22, 2023
…5354)

PCollection data sampling for Java SDK harness apache#25064

This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest.

Task apache#25064
ruslan-ikhsan pushed a commit to akvelon/beam that referenced this pull request Mar 10, 2023
…5354)

PCollection data sampling for Java SDK harness apache#25064

This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest.

Task apache#25064
cushon pushed a commit to cushon/beam that referenced this pull request Mar 16, 2023
…5354)

PCollection data sampling for Java SDK harness apache#25064

This adds the capability for the Java SDK harness to sample in-flight elements. The implementation modifies the ProcessBundleDescriptor when received by the BundleProcessor to create additional DataSampling PTransforms on PCollections. The samples are then returned when the SDK receives a SampleDataRequest.

Task apache#25064
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants