Skip to content

Commit

Permalink
fix stuckness on range movements
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 12, 2024
1 parent 7d74d94 commit 1e02463
Show file tree
Hide file tree
Showing 18 changed files with 415 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Duration;
Expand Down Expand Up @@ -221,10 +222,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillServiceStreamMaxBackoffMillis(int value);

@Description(
"If true, Dataflow streaming pipeline will be running in direct path mode."
+ " VMs must have IPv6 enabled for this to work.")
@Default.Boolean(false)
@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();

void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled);
Expand Down Expand Up @@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1;
}
}

/** EnableStreamingEngine defaults to false unless one of the two experiments is set. */
class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory<Boolean> {
@Override
public Boolean create(PipelineOptions options) {
return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors;
Expand Down Expand Up @@ -186,7 +188,8 @@ private StreamingDataflowWorker(
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
ConcurrentMap<String, StageInfo> stageInfoMap,
@Nullable GrpcDispatcherClient dispatcherClient) {
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
this.configFetcher = configFetcher;
Expand Down Expand Up @@ -232,12 +235,7 @@ private StreamingDataflowWorker(
if (isDirectPathPipeline(options)) {
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
JobHeader.newBuilder()
.setJobId(options.getJobId())
.setProjectId(options.getProject())
.setWorkerId(options.getWorkerId())
.setClientId(clientId)
.build(),
createJobHeader(options, clientId),
GetWorkBudget.builder()
.setItems(chooseMaxBundlesOutstanding(options))
.setBytes(MAX_GET_WORK_FETCH_BYTES)
Expand All @@ -258,7 +256,7 @@ private StreamingDataflowWorker(
}),
stubFactory,
GetWorkBudgetDistributors.distributeEvenly(),
GrpcDispatcherClient.create(stubFactory),
Preconditions.checkNotNull(dispatcherClient),
commitWorkStream ->
StreamingEngineWorkCommitter.builder()
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
Expand Down Expand Up @@ -468,7 +466,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
operationalLimits,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
stageInfo,
configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient());
}

/**
Expand All @@ -488,6 +487,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
WindmillServerStub windmillServer;
ComputationStateCache computationStateCache;
GrpcWindmillStreamFactory windmillStreamFactory;
ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder =
ConfigFetcherComputationStateCacheAndWindmillClient.builder();
if (options.isEnableStreamingEngine()) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(createStubFactory(options));
Expand All @@ -510,19 +511,20 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient);
builder.setWindmillDispatcherClient(dispatcherClient);
} else {
if (options.getWindmillServiceEndpoint() != null
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(createStubFactory(options));
windmillStreamFactory =
windmillStreamFactoryBuilder
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
windmillServer =
GrpcWindmillServer.create(
options,
windmillStreamFactory,
GrpcDispatcherClient.create(createStubFactory(options)));
GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient);
builder.setWindmillDispatcherClient(dispatcherClient);
} else {
windmillStreamFactory = windmillStreamFactoryBuilder.build();
windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport());
Expand All @@ -532,8 +534,12 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
computationStateCache = computationStateCacheFactory.apply(configFetcher);
}

return ConfigFetcherComputationStateCacheAndWindmillClient.create(
configFetcher, computationStateCache, windmillServer, windmillStreamFactory);
return builder
.setConfigFetcher(configFetcher)
.setComputationStateCache(computationStateCache)
.setWindmillServer(windmillServer)
.setWindmillStreamFactory(windmillStreamFactory)
.build();
}

private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) {
Expand Down Expand Up @@ -577,7 +583,8 @@ static StreamingDataflowWorker forTesting(
Function<String, ScheduledExecutorService> executorSupplier,
int localRetryTimeoutMs,
OperationalLimits limits,
StreamingCounters streamingCounters) {
StreamingCounters streamingCounters,
WindmillStubFactory stubFactory) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
AtomicReference<OperationalLimits> operationalLimits = new AtomicReference<>(limits);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
Expand Down Expand Up @@ -677,7 +684,8 @@ static StreamingDataflowWorker forTesting(
.build()
: windmillStreamFactory.build(),
executorSupplier,
stageInfo);
stageInfo,
GrpcDispatcherClient.create(stubFactory));
}

private static void onPipelineConfig(
Expand Down Expand Up @@ -706,13 +714,7 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory
!options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null
? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
: Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
return GrpcWindmillStreamFactory.of(
JobHeader.newBuilder()
.setJobId(options.getJobId())
.setProjectId(options.getProject())
.setWorkerId(options.getWorkerId())
.setClientId(clientId)
.build())
return GrpcWindmillStreamFactory.of(createJobHeader(options, clientId))
.setWindmillMessagesBetweenIsReadyChecks(options.getWindmillMessagesBetweenIsReadyChecks())
.setMaxBackOffSupplier(() -> maxBackoff)
.setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures())
Expand All @@ -723,6 +725,15 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory
options, "streaming_engine_disable_new_heartbeat_requests"));
}

private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, long clientId) {
return JobHeader.newBuilder()
.setJobId(options.getJobId())
.setProjectId(options.getProject())
.setWorkerId(options.getWorkerId())
.setClientId(clientId)
.build();
}

private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
return new BoundedQueueExecutor(
chooseMaxThreads(options),
Expand Down Expand Up @@ -900,13 +911,9 @@ private interface StreamingWorkerStatusReporterFactory {
@AutoValue
abstract static class ConfigFetcherComputationStateCacheAndWindmillClient {

private static ConfigFetcherComputationStateCacheAndWindmillClient create(
ComputationConfig.Fetcher configFetcher,
ComputationStateCache computationStateCache,
WindmillServerStub windmillServer,
GrpcWindmillStreamFactory windmillStreamFactory) {
return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient(
configFetcher, computationStateCache, windmillServer, windmillStreamFactory);
private static Builder builder() {
return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient
.Builder();
}

abstract ComputationConfig.Fetcher configFetcher();
Expand All @@ -916,6 +923,23 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create(
abstract WindmillServerStub windmillServer();

abstract GrpcWindmillStreamFactory windmillStreamFactory();

abstract @Nullable GrpcDispatcherClient windmillDispatcherClient();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setConfigFetcher(ComputationConfig.Fetcher value);

abstract Builder setComputationStateCache(ComputationStateCache value);

abstract Builder setWindmillServer(WindmillServerStub value);

abstract Builder setWindmillStreamFactory(GrpcWindmillStreamFactory value);

abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value);

abstract ConfigFetcherComputationStateCacheAndWindmillClient build();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
if (executableWork != null) {
Work work = executableWork.work();
if (work.isStuckCommittingAt(stuckCommitDeadline)) {
LOG.error(
LOG.debug(
"Detected key {} stuck in COMMITTING state since {}, completing it with error.",
shardedKey,
work.getStateStartTime());
Expand All @@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
writer.println(
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th></tr>");
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th><th>Produced By</th></tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,18 @@ public static ProcessingContext createProcessingContext(
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
return ProcessingContext.create(computationId, getDataClient, workCommitter, heartbeatSender);
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, "");
}

public static ProcessingContext createProcessingContext(
String backendWorkerToken,
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, backendWorkerToken);
}

private static LatencyAttribution.Builder createLatencyAttributionWithActiveLatencyBreakdown(
Expand Down Expand Up @@ -167,6 +178,10 @@ public GlobalData fetchSideInput(GlobalDataRequest request) {
return processingContext.getDataClient().getSideInputData(request);
}

public String backendWorkerToken() {
return processingContext.backendWorkerToken();
}

public Watermarks watermarks() {
return watermarks;
}
Expand Down Expand Up @@ -342,9 +357,10 @@ private static ProcessingContext create(
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
HeartbeatSender heartbeatSender,
String backendWorkerToken) {
return new AutoValue_Work_ProcessingContext(
computationId, getDataClient, heartbeatSender, workCommitter);
computationId, getDataClient, heartbeatSender, workCommitter, backendWorkerToken);
}

/** Computation that the {@link Work} belongs to. */
Expand All @@ -361,6 +377,8 @@ private static ProcessingContext create(
*/
public abstract Consumer<Commit> workCommitter();

public abstract String backendWorkerToken();

private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest request) {
return Optional.ofNullable(getDataClient().getStateData(computationId(), request));
}
Expand Down
Loading

0 comments on commit 1e02463

Please sign in to comment.