Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Flink's SpeculativeExecution in batch execution mode #10548

Merged
merged 9 commits into from
Jul 24, 2024
Next Next commit
Support SpeculativeExecution with Flink in batch execution mode
  • Loading branch information
venkata91 committed Jun 30, 2024
commit 7bcfdb2f6d1d8ac899d4fdaa642414321738f057
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
Expand All @@ -37,7 +38,8 @@
import org.slf4j.LoggerFactory;

abstract class AbstractIcebergEnumerator
implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>,
SupportsHandleExecutionAttemptSourceEvent {
private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);

private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
Expand Down Expand Up @@ -95,6 +97,13 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
}

// Flink's SourceCoordinator already keeps track of subTask to splits mapping.
// It already takes care of re-assigning splits to speculated attempts as well.
@Override
public void handleSourceEvent(int subTaskId, int attemptNumber, SourceEvent sourceEvent) {
handleSourceEvent(subTaskId, sourceEvent);
}

@Override
public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
LOG.info("Add {} splits back to the pool for failed subtask {}", splits.size(), subtaskId);
Expand Down