Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Fix two flaky tests (apache#30278)
Browse files Browse the repository at this point in the history
* Fix a flaky test caused by race condition

* Use notify and wait per reviewer request

* Increase assert threshold to allow more room for extra threads

* Move synchronized block outside of assertThrows

* Replace this with isReady when defining synchronized block
  • Loading branch information
shunping committed Feb 11, 2024
1 parent 34cfcc3 commit 6d10e0f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -146,10 +147,15 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
Collections.emptyList());

AtomicBoolean isReady = new AtomicBoolean(false);
Future<?> future =
executor.submit(
() -> {
observer.accept(dataWith("ABC"));
synchronized (isReady) {
isReady.set(true);
isReady.notify();
}
assertThrows(
BeamFnDataInboundObserver.CloseException.class,
() -> {
Expand All @@ -165,6 +171,11 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio
Future<?> future2 =
executor.submit(
() -> {
synchronized (isReady) {
while (!isReady.get()) {
isReady.wait();
}
}
observer.close();
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
// Ideally we would never create more than 100, however with contention it is still possible
// some extra threads will be created.
assertTrue(largestPool <= 104);
assertTrue(largestPool <= 110);
executorService.shutdown();
}
}

0 comments on commit 6d10e0f

Please sign in to comment.