Skip to content

Commit

Permalink
[FLINK-21101][Azure] Add nightly profile to run all tests with the ad…
Browse files Browse the repository at this point in the history
…aptive scheduler
  • Loading branch information
rmetzger committed Feb 23, 2021
1 parent b7e93fd commit dc0c416
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

Expand Down Expand Up @@ -61,6 +63,7 @@ public static Collection<Object[]> params() {
}

@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public void testFileSink() throws Exception {
String path = TEMPORARY_FOLDER.newFolder().getAbsolutePath();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

import java.io.File;
Expand Down Expand Up @@ -164,6 +166,7 @@ public void testContinuousTextFileSource() throws Exception {
* record format (text lines) and restarts TaskManager.
*/
@Test
@Category(FailsWithAdaptiveScheduler.class)
public void testContinuousTextFileSourceWithTaskManagerFailover() throws Exception {
testContinuousTextFileSource(FailoverType.TM);
}
Expand Down
97 changes: 55 additions & 42 deletions flink-end-to-end-tests/run-nightly-tests.sh

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -127,6 +128,7 @@
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
Expand All @@ -141,6 +143,7 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -762,6 +765,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception {
* submission.
*/
@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21398
public void testRestoringFromSavepoint() throws Exception {

// create savepoint data
Expand Down Expand Up @@ -803,6 +807,7 @@ public void testRestoringFromSavepoint() throws Exception {
* allowed.
*/
@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21398
public void testRestoringModifiedJobFromSavepoint() throws Exception {

// create savepoint data
Expand Down Expand Up @@ -860,6 +865,7 @@ public void testRestoringModifiedJobFromSavepoint() throws Exception {

/** Tests that an existing checkpoint will have precedence over an savepoint. */
@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21398
public void testCheckpointPrecedesSavepointRecovery() throws Exception {

// create savepoint data
Expand Down Expand Up @@ -1045,6 +1051,7 @@ public void testResourceManagerConnectionAfterStart() throws Exception {
* if this execution fails.
*/
@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21450
public void testRequestNextInputSplitWithLocalFailover() throws Exception {

configuration.setString(
Expand All @@ -1058,6 +1065,7 @@ public void testRequestNextInputSplitWithLocalFailover() throws Exception {
}

@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21399
public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
configuration.set(
Expand Down Expand Up @@ -1092,6 +1100,7 @@ private void runRequestNextInputSplitTest(
source.setInvokableClass(AbstractInvokable.class);

final JobGraph inputSplitJobGraph = new JobGraph(source);
jobGraph.setJobType(JobType.STREAMING);

final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0));
Expand Down Expand Up @@ -1926,6 +1935,7 @@ private JobGraph createJobGraphWithCheckpointing(
private JobGraph createJobGraphFromJobVerticesWithCheckpointing(
SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertices) {
final JobGraph jobGraph = new JobGraph(jobVertices);
jobGraph.setJobType(JobType.STREAMING);

// enable checkpointing which is required to resume from a savepoint
final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelog
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
import org.apache.flink.table.planner.runtime.utils.TestData.{data1, nullData4, smallTupleData3, tupleData3, tupleData5}
import org.apache.flink.table.utils.LegacyRowResource
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler
import org.apache.flink.util.ExceptionUtils

import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.experimental.categories.Category
import org.junit.{Rule, Test}

import java.lang.{Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -691,6 +691,7 @@ class TableSinkITCase extends StreamingTestBase {
}

@Test
@Category(Array(classOf[FailsWithAdaptiveScheduler])) // FLINK-21403
def testParallelismWithSinkFunction(): Unit = {
val negativeParallelism = -1
val validParallelism = 1
Expand Down Expand Up @@ -732,6 +733,7 @@ class TableSinkITCase extends StreamingTestBase {
}

@Test
@Category(Array(classOf[FailsWithAdaptiveScheduler])) // FLINK-21403
def testParallelismWithOutputFormat(): Unit = {
val negativeParallelism = -1
val oversizedParallelism = Int.MaxValue
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.testutils.junit;

/** Marker for explicitly ignoring a test which fails with adaptive scheduler. */
public interface FailsWithAdaptiveScheduler {}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -53,6 +55,7 @@
* <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {

private static final int PARALLELISM = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

Expand All @@ -58,6 +59,7 @@
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
Expand All @@ -84,6 +86,7 @@
*/
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public class EventTimeWindowCheckpointingITCase extends TestLogger {

private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.TestLogger;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -46,6 +48,7 @@
* EventTimeWindowCheckpointingITCase}.
*/
@RunWith(Parameterized.class)
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public class LocalRecoveryITCase extends TestLogger {

@Rule public TestName testName = new TestName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.util.Map;

Expand All @@ -56,6 +58,7 @@
* handled correctly.
*/
@SuppressWarnings("serial")
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public class ProcessingTimeWindowCheckpointingITCase extends TestLogger {

private static final int PARALLELISM = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
Expand All @@ -74,6 +75,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -440,6 +442,7 @@ private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable)
}

@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21333
public void testStopSavepointWithBoundedInput() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.test.checkpointing.utils.AccumulatingIntegerSink;
import org.apache.flink.test.checkpointing.utils.CancellingIntegerSource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -38,6 +39,7 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -69,6 +71,7 @@
* enabled/disabled).
*/
@RunWith(Parameterized.class)
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21333
public class UnalignedCheckpointCompatibilityITCase extends TestLogger {

@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;

import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

Expand Down Expand Up @@ -95,6 +97,7 @@
* </ul>
*/
@RunWith(Parameterized.class)
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {

@Parameterized.Parameters(name = "{0}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;

import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.ErrorCollector;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
Expand All @@ -85,6 +87,7 @@
import static org.junit.Assert.fail;

/** Base class for tests related to unaligned checkpoints. */
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
public abstract class UnalignedCheckpointTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointTestBase.class);
protected static final String NUM_OUTPUTS = "outputs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -159,6 +161,7 @@ public void testWatermarkPropagation() throws Exception {
}

@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21333
public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {

// for this test to work, we need to be sure that no other jobs are being executed
Expand Down
Loading

0 comments on commit dc0c416

Please sign in to comment.