Skip to content

Commit

Permalink
[FLINK-27045][tests] Remove shared executor
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 8, 2022
1 parent 7d22a0c commit f1867d3
Show file tree
Hide file tree
Showing 144 changed files with 1,643 additions and 807 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -73,6 +76,10 @@
@ExtendWith(TestLoggerExtension.class)
public class ApplicationDispatcherBootstrapITCase {

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

private static Supplier<DispatcherResourceManagerComponentFactory>
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
Configuration configuration, PackagedProgram program) {
Expand Down Expand Up @@ -103,7 +110,7 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
.setConfiguration(configuration)
.build();
final EmbeddedHaServicesWithLeadershipControl haServices =
new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
new EmbeddedHaServicesWithLeadershipControl(EXECUTOR_EXTENSION.getExecutor());
final TestingMiniCluster.Builder clusterBuilder =
TestingMiniCluster.newBuilder(clusterConfiguration)
.setHighAvailabilityServicesSupplier(() -> haServices)
Expand Down Expand Up @@ -170,7 +177,7 @@ public void testDirtyJobResultRecoveryInApplicationMode() throws Exception {
TestingJobResultStore.createSuccessfulJobResult(
ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
final EmbeddedHaServicesWithLeadershipControl haServices =
new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
new EmbeddedHaServicesWithLeadershipControl(EXECUTOR_EXTENSION.getExecutor()) {

@Override
public JobResultStore getJobResultStore() {
Expand Down Expand Up @@ -223,7 +230,7 @@ public void testSubmitFailedJobOnApplicationError() throws Exception {
.setConfiguration(configuration)
.build();
final EmbeddedHaServicesWithLeadershipControl haServices =
new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
new EmbeddedHaServicesWithLeadershipControl(EXECUTOR_EXTENSION.getExecutor());
final TestingMiniCluster.Builder clusterBuilder =
TestingMiniCluster.newBuilder(clusterConfiguration)
.setHighAvailabilityServicesSupplier(() -> haServices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.testutils;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.executor.TestExecutorResource;

import java.time.Duration;
import java.util.UUID;
Expand All @@ -35,8 +35,6 @@ public class TestingUtils {
public static final Time TIMEOUT = Time.minutes(1L);
public static final Duration DEFAULT_AKKA_ASK_TIMEOUT = Duration.ofSeconds(200);

private static ScheduledExecutorService sharedExecutorInstance;

public static Time infiniteTime() {
return Time.milliseconds(Integer.MAX_VALUE);
}
Expand All @@ -47,16 +45,12 @@ public static Duration infiniteDuration() {
return Duration.ofDays(365L);
}

public static synchronized ScheduledExecutorService defaultExecutor() {
if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown()) {
sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor();
}

return sharedExecutorInstance;
public static TestExecutorExtension<ScheduledExecutorService> defaultExecutorExtension() {
return new TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor);
}

public static ScheduledExecutor defaultScheduledExecutor() {
return new ScheduledExecutorServiceAdapter(defaultExecutor());
public static TestExecutorResource<ScheduledExecutorService> defaultExecutorResource() {
return new TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
}

public static UUID zeroUUID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -71,8 +70,8 @@
public class FutureUtilsTest extends TestLogger {

@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE =
new TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();

/** Tests that we can retry an operation. */
@Test
Expand All @@ -91,9 +90,9 @@ public void testRetrySuccess() throws Exception {
new FlinkException("Test exception"));
}
},
TestingUtils.defaultExecutor()),
EXECUTOR_RESOURCE.getExecutor()),
retries,
TestingUtils.defaultExecutor());
EXECUTOR_RESOURCE.getExecutor());

assertTrue(retryFuture.get());
assertEquals(retries, atomicInteger.get());
Expand All @@ -110,7 +109,7 @@ public void testRetryFailureFixedRetries() throws Throwable {
FutureUtils.completedExceptionally(
new FlinkException("Test exception")),
retries,
TestingUtils.defaultExecutor());
EXECUTOR_RESOURCE.getExecutor());

try {
retryFuture.get();
Expand Down Expand Up @@ -145,9 +144,9 @@ public void testRetryCancellation() throws Exception {
throw new CompletionException(
new FlinkException("Test exception"));
},
TestingUtils.defaultExecutor()),
EXECUTOR_RESOURCE.getExecutor()),
retries,
TestingUtils.defaultExecutor());
EXECUTOR_RESOURCE.getExecutor());

// await that we have failed once
notificationLatch.await();
Expand Down Expand Up @@ -191,12 +190,12 @@ public void testStopAtNonRetryableException() {
new FlinkException("Test exception"));
}
},
TestingUtils.defaultExecutor()),
EXECUTOR_RESOURCE.getExecutor()),
retries,
throwable ->
ExceptionUtils.findThrowable(throwable, FlinkException.class)
.isPresent(),
TestingUtils.defaultExecutor());
EXECUTOR_RESOURCE.getExecutor());

try {
retryFuture.get();
Expand All @@ -216,7 +215,7 @@ public void testRetryWithDelayRetryStrategyFailure() throws Throwable {
FutureUtils.completedExceptionally(
new FlinkException("Test exception")),
new FixedRetryStrategy(3, Duration.ofMillis(1L)),
TestingUtils.defaultScheduledExecutor());
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

try {
retryFuture.get(TestingUtils.TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -248,7 +247,7 @@ public void testRetryWithDelayRetryStrategy() throws Exception {
},
new ExponentialBackoffRetryStrategy(
retries, Duration.ofMillis(2L), Duration.ofMillis(5L)),
TestingUtils.defaultScheduledExecutor());
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

Boolean result = retryFuture.get();

Expand Down Expand Up @@ -336,7 +335,7 @@ public void testScheduleWithInfiniteDelayNeverSchedulesOperation() {
FutureUtils.scheduleWithDelay(
noOpRunnable,
TestingUtils.infiniteTime(),
TestingUtils.defaultScheduledExecutor());
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

assertFalse(completableFuture.isDone());

Expand All @@ -360,7 +359,7 @@ public void testOrTimeout() throws Exception {

@Test
public void testRetryWithDelayRetryStrategyAndPredicate() throws Exception {
final ScheduledExecutorService retryExecutor = TEST_EXECUTOR_RESOURCE.getExecutor();
final ScheduledExecutorService retryExecutor = EXECUTOR_RESOURCE.getExecutor();
final String retryableExceptionMessage = "first exception";
class TestStringSupplier implements Supplier<CompletableFuture<String>> {
private final AtomicInteger counter = new AtomicInteger();
Expand Down Expand Up @@ -675,7 +674,7 @@ public void testSupplyAsyncFailure() throws Exception {
() -> {
throw testException;
},
TestingUtils.defaultExecutor());
EXECUTOR_RESOURCE.getExecutor());

try {
future.get();
Expand All @@ -691,7 +690,7 @@ public void testSupplyAsyncFailure() throws Exception {
public void testSupplyAsync() throws Exception {
final Object expectedResult = new Object();
final CompletableFuture<Object> future =
FutureUtils.supplyAsync(() -> expectedResult, TestingUtils.defaultExecutor());
FutureUtils.supplyAsync(() -> expectedResult, EXECUTOR_RESOURCE.getExecutor());

assertEquals(future.get(), expectedResult);
}
Expand Down Expand Up @@ -964,7 +963,7 @@ public void testGetWithoutExceptionWithoutFinishing() {
public void testSwitchExecutorForNormallyCompletedFuture() {
final CompletableFuture<String> source = new CompletableFuture<>();

final ExecutorService singleThreadExecutor = TEST_EXECUTOR_RESOURCE.getExecutor();
final ExecutorService singleThreadExecutor = EXECUTOR_RESOURCE.getExecutor();

final CompletableFuture<String> resultFuture =
FutureUtils.switchExecutor(source, singleThreadExecutor);
Expand Down Expand Up @@ -992,7 +991,7 @@ public void testSwitchExecutorForNormallyCompletedFuture() {
public void testSwitchExecutorForExceptionallyCompletedFuture() {
final CompletableFuture<String> source = new CompletableFuture<>();

final ExecutorService singleThreadExecutor = TEST_EXECUTOR_RESOURCE.getExecutor();
final ExecutorService singleThreadExecutor = EXECUTOR_RESOURCE.getExecutor();

final CompletableFuture<String> resultFuture =
FutureUtils.switchExecutor(source, singleThreadExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.RunnableWithException;
Expand All @@ -40,8 +41,7 @@

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -58,8 +58,8 @@ public class KubernetesMultipleComponentLeaderElectionDriverTest {
new TestingFatalErrorHandlerExtension();

@RegisterExtension
private static final TestExecutorExtension<ExecutorService> testExecutorExtension =
new TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor);
private static final TestExecutorExtension<ScheduledExecutorService> testExecutorExtension =
TestingUtils.defaultExecutorExtension();

@Test
public void testElectionDriverGainsLeadership() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -50,13 +52,18 @@
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests to verify JMX reporter functionality on the JobManager. */
class JMXJobManagerMetricTest {

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
Expand Down Expand Up @@ -118,7 +125,7 @@ void testJobManagerJMXMetricAccess(@InjectClusterClient ClusterClient<?> client)
Time.milliseconds(10),
deadline,
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor())
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);

MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -1347,7 +1346,7 @@ public void close() throws Exception {
Time.milliseconds(50),
deadline,
(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
TestingUtils.defaultScheduledExecutor());
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));
assertEquals(
JobStatus.CANCELED,
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -53,7 +52,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -92,7 +90,6 @@ enum ProgramArgsParType {
CompletableFuture.completedFuture("shazam://localhost:12345");
static Time timeout = Time.seconds(10);
static Map<String, String> responseHeaders = Collections.emptyMap();
static Executor executor = TestingUtils.defaultExecutor();

private static Path jarWithManifest;
private static Path jarWithoutManifest;
Expand Down Expand Up @@ -129,7 +126,6 @@ static void init() throws Exception {
localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
timeout = Time.seconds(10);
responseHeaders = Collections.emptyMap();
executor = TestingUtils.defaultExecutor();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -32,6 +34,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -43,6 +46,10 @@ public class JarHandlerTest extends TestLogger {

@ClassRule public static final TemporaryFolder TMP = new TemporaryFolder();

@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();

@Test
public void testPlanJar() throws Exception {
runTest("hello out!", "hello err!");
Expand All @@ -53,7 +60,9 @@ private static void runTest(String expectedCapturedStdOut, String expectedCaptur
final TestingDispatcherGateway restfulGateway =
TestingDispatcherGateway.newBuilder().build();

final JarHandlers handlers = new JarHandlers(TMP.newFolder().toPath(), restfulGateway);
final JarHandlers handlers =
new JarHandlers(
TMP.newFolder().toPath(), restfulGateway, EXECUTOR_RESOURCE.getExecutor());

final Path originalJar = Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME);
final Path jar = Files.copy(originalJar, TMP.newFolder().toPath().resolve(JAR_NAME));
Expand Down
Loading

0 comments on commit f1867d3

Please sign in to comment.