Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PCollection data sampling for Java SDK harness #25064 #25354

Merged
merged 43 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
cd630e4
Data Sampling Java Impl
Jan 26, 2023
e530d14
comments
Jan 26, 2023
a09f5d6
add PBD id to context
rohdesamuel Jan 31, 2023
1cb08f9
merge
Jan 31, 2023
566c7b8
Add more tests and spotless
Jan 31, 2023
7395a2e
Finish Java data sampling impl with tests, adding comments
Feb 1, 2023
4c1253f
more comments, remove Payload
Feb 1, 2023
3d88254
more comments
Feb 1, 2023
5bbef91
spotless
Feb 1, 2023
6993b83
Encode in the nested context
rohdesamuel Feb 3, 2023
769902f
Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/con…
rohdesamuel Feb 7, 2023
5c06d4e
Apply suggestions from code review
rohdesamuel Feb 7, 2023
99087e8
address pr comments
Feb 9, 2023
5609e4a
give default pbd id to test context
rohdesamuel Feb 9, 2023
694c929
address spotlesscheck
rohdesamuel Feb 9, 2023
519aece
spotless apply
rohdesamuel Feb 9, 2023
8efbb15
style guide spotless apply
rohdesamuel Feb 9, 2023
cd3732d
add serviceloader
rohdesamuel Feb 9, 2023
415e3f0
change datasampling to modify the consumers and not graph for sampling
rohdesamuel Feb 10, 2023
553fc5e
remove redundant SamplerState obj
rohdesamuel Feb 10, 2023
e260eb4
spotless
rohdesamuel Feb 10, 2023
4f29308
replace mutex with atomics in output sampler to reduce contention
rohdesamuel Feb 10, 2023
8cbc6c8
spotless and fix OutputSamplerTest
rohdesamuel Feb 10, 2023
f67234f
Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/dat…
rohdesamuel Feb 13, 2023
6815330
Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/dat…
rohdesamuel Feb 13, 2023
4848725
Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/deb…
rohdesamuel Feb 13, 2023
6c16576
always init outputsampler
rohdesamuel Feb 13, 2023
822587d
add final to DataSampler in FnHarness
rohdesamuel Feb 13, 2023
fe9ab2a
spotless apply
rohdesamuel Feb 13, 2023
07d37ea
update from proto names
rohdesamuel Feb 13, 2023
5e9c4b0
spotless bugs
Feb 14, 2023
f5f97fb
Apply suggestions from code review
rohdesamuel Feb 14, 2023
c3db7c0
address pr comments
Feb 14, 2023
fce6d69
spotlessapply and add byte[] test
Feb 15, 2023
69d8bb4
validate datasampler args
Feb 15, 2023
be2ebe4
add concurrency tests
Feb 15, 2023
93af06e
Merge branch 'master' into data-sampling-java
lukecwik Feb 21, 2023
cbdbbc3
Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/deb…
rohdesamuel Feb 21, 2023
8cc8ee0
Update sdks/java/harness/src/test/java/org/apache/beam/fn/harness/deb…
rohdesamuel Feb 21, 2023
10aa7de
Update sdks/java/harness/src/test/java/org/apache/beam/fn/harness/deb…
rohdesamuel Feb 21, 2023
04c6c88
Update sdks/java/harness/src/test/java/org/apache/beam/fn/harness/deb…
rohdesamuel Feb 21, 2023
862439b
improve contention tests
Feb 22, 2023
490be4a
spotless
Feb 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change datasampling to modify the consumers and not graph for sampling
  • Loading branch information
rohdesamuel committed Feb 13, 2023
commit 415e3f0ab8b34b3ca237a3c40c18b0b4eb64cf90
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.fn.harness;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
Expand All @@ -33,7 +32,6 @@
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.debug.DataSamplingDescriptorModifier;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.status.BeamFnStatusClient;
Expand Down Expand Up @@ -96,7 +94,7 @@ public class FnHarness {
private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = "enable_data_sampling";
private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
private static final DataSampler dataSampler = new DataSampler();
private static DataSampler dataSampler = new DataSampler();

private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String descriptor)
throws TextFormat.ParseException {
Expand Down Expand Up @@ -255,16 +253,12 @@ public static void main(

FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(executorService);

// Add any graph modifications.
List<ProcessBundleDescriptorModifier> modifierList = new ArrayList<>();
// Create the sampler, if the experiment is enabled.
Optional<List<String>> experimentList =
Optional.ofNullable(options.as(ExperimentalOptions.class).getExperiments());

// If data sampling is enabled, then modify the graph to add any DataSampling Operations.
if (experimentList.isPresent()
&& experimentList.get().contains(ENABLE_DATA_SAMPLING_EXPERIMENT)) {
modifierList.add(new DataSamplingDescriptorModifier());
}
boolean shouldSample =
experimentList.isPresent()
&& experimentList.get().contains(ENABLE_DATA_SAMPLING_EXPERIMENT);

// Retrieves the ProcessBundleDescriptor from cache. Requests the PBD from the Runner if it
// doesn't exist. Additionally, runs any graph modifications.
Expand All @@ -274,29 +268,16 @@ public static void main(
private final Cache<String, BeamFnApi.ProcessBundleDescriptor> cache =
Caches.subCache(processWideCache, PROCESS_BUNDLE_DESCRIPTORS);

private final List<ProcessBundleDescriptorModifier> modifiers = modifierList;

@Override
public BeamFnApi.ProcessBundleDescriptor apply(String id) {
return cache.computeIfAbsent(id, this::loadDescriptor);
}

private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
ProcessBundleDescriptor descriptor =
blockingControlStub.getProcessBundleDescriptor(
BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder()
.setProcessBundleDescriptorId(id)
.build());
for (ProcessBundleDescriptorModifier modifier : modifiers) {
try {
LOG.debug("Modifying graph with " + modifier);
descriptor = modifier.modifyProcessBundleDescriptor(descriptor);
} catch (ProcessBundleDescriptorModifier.GraphModificationException e) {
LOG.warn("Could not modify graph with " + modifier + ": " + e.getMessage());
}
}

return descriptor;
return blockingControlStub.getProcessBundleDescriptor(
BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder()
.setProcessBundleDescriptorId(id)
.build());
}
};

Expand All @@ -313,7 +294,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
metricsShortIds,
executionStateSampler,
processWideCache,
dataSampler);
shouldSample ? dataSampler : null);
logging.setProcessBundleHandler(processBundleHandler);

BeamFnStatusClient beamFnStatusClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand Down Expand Up @@ -61,9 +59,6 @@ interface Context {
/** A client for handling state requests. */
BeamFnStateClient getBeamFnStateClient();

/** The id of the parent ProcessBundleDescriptor. */
String getProcessBundleDescriptorId();

/** The id of the PTransform. */
String getPTransformId();

Expand Down Expand Up @@ -168,13 +163,6 @@ <T> void addIncomingTimerEndpoint(
* instant provides the timeout on how long the finalization callback is valid for.
*/
DoFn.BundleFinalizer getBundleFinalizer();

/**
* A DataSampler can be used to sample in-flight elements. This is used to plumb a global
* DataSampler to DataSampler operations in order to perform said sampling. Only present when
* using the "enable_data_sampling" experiment.
*/
Optional<DataSampler> getDataSampler();
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.WeakHashMap;
Expand Down Expand Up @@ -167,7 +166,7 @@ public class ProcessBundleHandler {
@VisibleForTesting final BundleProcessorCache bundleProcessorCache;
private final Set<String> runnerCapabilities;

private final DataSampler dataSampler;
private final @Nullable DataSampler dataSampler;

public ProcessBundleHandler(
PipelineOptions options,
Expand All @@ -179,7 +178,7 @@ public ProcessBundleHandler(
ShortIdMap shortIds,
ExecutionStateSampler executionStateSampler,
Cache<Object, Object> processWideCache,
DataSampler dataSampler) {
@Nullable DataSampler dataSampler) {
this(
options,
runnerCapabilities,
Expand Down Expand Up @@ -208,7 +207,7 @@ public ProcessBundleHandler(
Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap,
Cache<Object, Object> processWideCache,
BundleProcessorCache bundleProcessorCache,
DataSampler dataSampler) {
@Nullable DataSampler dataSampler) {
this.options = options;
this.fnApiRegistry = fnApiRegistry;
this.beamFnDataClient = beamFnDataClient;
Expand Down Expand Up @@ -328,11 +327,6 @@ public BeamFnStateClient getBeamFnStateClient() {
return beamFnStateClient;
}

@Override
public String getProcessBundleDescriptorId() {
return processBundleDescriptor.getId();
}

@Override
public String getPTransformId() {
return pTransformId;
Expand Down Expand Up @@ -494,11 +488,6 @@ public BundleSplitListener getSplitListener() {
public BundleFinalizer getBundleFinalizer() {
return bundleFinalizer;
}

@Override
public Optional<DataSampler> getDataSampler() {
return Optional.ofNullable(dataSampler);
}
});
if (runner instanceof BeamFnDataReadRunner) {
channelRoots.add((BeamFnDataReadRunner) runner);
Expand Down Expand Up @@ -789,7 +778,7 @@ private BundleProcessor createBundleProcessor(
bundleProgressReporterAndRegistrar.register(stateTracker);
PCollectionConsumerRegistry pCollectionConsumerRegistry =
new PCollectionConsumerRegistry(
stateTracker, shortIds, bundleProgressReporterAndRegistrar, bundleDescriptor);
stateTracker, shortIds, bundleProgressReporterAndRegistrar, bundleDescriptor, dataSampler);
HashSet<String> processedPTransformIds = new HashSet<>();

PTransformFunctionRegistry startFunctionRegistry =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
import org.apache.beam.fn.harness.control.Metrics;
import org.apache.beam.fn.harness.control.Metrics.BundleCounter;
import org.apache.beam.fn.harness.control.Metrics.BundleDistribution;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.debug.OutputSampler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
Expand Down Expand Up @@ -87,12 +90,27 @@ public static ConsumerAndMetadata forConsumer(
private final BundleProgressReporter.Registrar bundleProgressReporterRegistrar;
private final ProcessBundleDescriptor processBundleDescriptor;
private final RehydratedComponents rehydratedComponents;
private final @Nullable DataSampler dataSampler;

public PCollectionConsumerRegistry(
ExecutionStateTracker stateTracker,
ShortIdMap shortIdMap,
BundleProgressReporter.Registrar bundleProgressReporterRegistrar,
ProcessBundleDescriptor processBundleDescriptor) {
this(
stateTracker,
shortIdMap,
bundleProgressReporterRegistrar,
processBundleDescriptor,
null);
}

public PCollectionConsumerRegistry(
ExecutionStateTracker stateTracker,
ShortIdMap shortIdMap,
BundleProgressReporter.Registrar bundleProgressReporterRegistrar,
ProcessBundleDescriptor processBundleDescriptor,
@Nullable DataSampler dataSampler) {
this.stateTracker = stateTracker;
this.shortIdMap = shortIdMap;
this.pCollectionIdsToConsumers = new HashMap<>();
Expand All @@ -106,6 +124,7 @@ public PCollectionConsumerRegistry(
.putAllPcollections(processBundleDescriptor.getPcollectionsMap())
.putAllWindowingStrategies(processBundleDescriptor.getWindowingStrategiesMap())
.build());
this.dataSampler = dataSampler;
}

/**
Expand Down Expand Up @@ -201,16 +220,16 @@ public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollecti
if (consumerAndMetadatas.size() == 1) {
ConsumerAndMetadata consumerAndMetadata = consumerAndMetadatas.get(0);
if (consumerAndMetadata.getConsumer() instanceof HandlesSplits) {
return new SplittingMetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadata);
return new SplittingMetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadata, dataSampler);
}
return new MetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadata);
return new MetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadata, dataSampler);
} else {
/* TODO(SDF), Consider supporting splitting each consumer individually. This would never
come up in the existing SDF expansion, but might be useful to support fused SDF nodes.
This would require dedicated delivery of the split results to each of the consumers
separately. */
return new MultiplexingMetricTrackingFnDataReceiver(
pcId, coder, ImmutableList.copyOf(consumerAndMetadatas));
pcId, coder, ImmutableList.copyOf(consumerAndMetadatas), dataSampler);
}
});
}
Expand All @@ -228,9 +247,11 @@ private class MetricTrackingFnDataReceiver<T> implements FnDataReceiver<Windowed
private final BundleCounter elementCountCounter;
private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final Coder<T> coder;
private OutputSampler<T> outputSampler;
rohdesamuel marked this conversation as resolved.
Show resolved Hide resolved

public MetricTrackingFnDataReceiver(
String pCollectionId, Coder<T> coder, ConsumerAndMetadata consumerAndMetadata) {
String pCollectionId, Coder<T> coder, ConsumerAndMetadata consumerAndMetadata,
@Nullable DataSampler dataSampler) {
this.delegate = consumerAndMetadata.getConsumer();
this.executionState = consumerAndMetadata.getExecutionState();

Expand Down Expand Up @@ -266,6 +287,9 @@ public MetricTrackingFnDataReceiver(
bundleProgressReporterRegistrar.register(sampledByteSizeUnderlyingDistribution);

this.coder = coder;
if (dataSampler != null) {
this.outputSampler = dataSampler.sampleOutput(pCollectionId, coder);
}
}

@Override
Expand All @@ -276,6 +300,10 @@ public void accept(WindowedValue<T> input) throws Exception {
// we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);

if (outputSampler != null) {
outputSampler.sample(input.getValue());
}

// Use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric and also ensure user counters can get an appropriate
// metrics container.
Expand All @@ -302,9 +330,11 @@ private class MultiplexingMetricTrackingFnDataReceiver<T>
private final BundleCounter elementCountCounter;
private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final Coder<T> coder;
private @Nullable OutputSampler<T> outputSampler = null;
rohdesamuel marked this conversation as resolved.
Show resolved Hide resolved

public MultiplexingMetricTrackingFnDataReceiver(
String pCollectionId, Coder<T> coder, List<ConsumerAndMetadata> consumerAndMetadatas) {
String pCollectionId, Coder<T> coder, List<ConsumerAndMetadata> consumerAndMetadatas,
@Nullable DataSampler dataSampler) {
this.consumerAndMetadatas = consumerAndMetadatas;

HashMap<String, String> labels = new HashMap<>();
Expand Down Expand Up @@ -339,6 +369,9 @@ public MultiplexingMetricTrackingFnDataReceiver(
bundleProgressReporterRegistrar.register(sampledByteSizeUnderlyingDistribution);

this.coder = coder;
if (dataSampler != null) {
this.outputSampler = dataSampler.sampleOutput(pCollectionId, coder);
}
}

@Override
Expand All @@ -349,6 +382,10 @@ public void accept(WindowedValue<T> input) throws Exception {
// when we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);

if (outputSampler != null) {
outputSampler.sample(input.getValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to sample the value with the windowing information attached?

The runner would be responsible for pulling out the attributes that are being sampled that can be introspected and/or saved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do, but I don't want to alter the encoding of the sampled bytes when sending back to the runner. It's not impossible, but I've found that there are a lot of mines when changing encodings.

}

// Use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric and also ensure user counters can get an appropriate
// metrics container.
Expand Down Expand Up @@ -377,8 +414,9 @@ private class SplittingMetricTrackingFnDataReceiver<T> extends MetricTrackingFnD
private final HandlesSplits delegate;

public SplittingMetricTrackingFnDataReceiver(
String pCollection, Coder<T> coder, ConsumerAndMetadata consumerAndMetadata) {
super(pCollection, coder, consumerAndMetadata);
String pCollection, Coder<T> coder, ConsumerAndMetadata consumerAndMetadata,
@Nullable DataSampler dataSampler) {
super(pCollection, coder, consumerAndMetadata, dataSampler);
this.delegate = (HandlesSplits) consumerAndMetadata.getConsumer();
}

Expand Down
Loading