Skip to content

Commit

Permalink
Added a unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
venkata91 committed Jun 21, 2024
1 parent f87a08b commit bcc1550
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ public void handleSourceEvent(int subTaskId, int attemptNumber, SourceEvent sour
LOG.info(
"Received request split event from subtask {} attempt {}", subTaskId, attemptNumber);
enumeratorContext.assignSplit(splitIdAssignedBySubtask.get(subTaskId), subTaskId);
} else {
LOG.info("Received request split event from subtask {}", subTaskId);
} else if (attemptNumber == 0) {
LOG.warn(
"Received request split event unexpectedly from subtask {} for speculative execution",
subTaskId);
} else if (!splitIdAssignedBySubtask.containsKey(subTaskId)) {
LOG.info("Looks like the subtask {} failed already and got cleaned up", subTaskId);
}
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,41 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
.contains(splits.get(0));
}

@Test
public void testSpeculativeExecutionAttempt() throws Exception {
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext config =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, config, splitPlanner);

// register one reader, and let it request a split
enumeratorContext.registerReader(2, "localhost");
enumerator.addReader(2);
enumerator.handleSourceEvent(2, new SplitRequestEvent());

// make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
enumeratorContext.triggerAllActions();

List<IcebergSourceSplit> assignedSplits =
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits();

enumerator.handleSourceEvent(2, 2, new SplitRequestEvent());

List<IcebergSourceSplit> speculatedAssignedSplits =
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits();

Assert.assertEquals(assignedSplits, speculatedAssignedSplits);
}

private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,
Expand Down

0 comments on commit bcc1550

Please sign in to comment.