Skip to content

Commit

Permalink
[hotfix][connectors] Improve the typo and code style
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Jan 23, 2022
1 parent d864ce3 commit 0dd661f
Show file tree
Hide file tree
Showing 17 changed files with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
@TestExternalSystem
PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());

// Defines a external context Factories,
// Defines an external context Factories,
// so test cases will be invoked using this external contexts.
@TestContext
PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/** Common test context for pulsar based test. */
public abstract class PulsarTestContext<T> implements DataStreamSourceExternalContext<T> {
private static final long serialVersionUID = 4717940854368532130L;
private static final long serialVersionUID = 1L;

private static final int NUM_RECORDS_UPPER_BOUND = 500;
private static final int NUM_RECORDS_LOWER_BOUND = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* splits.
*/
public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext {
private static final long serialVersionUID = -3855336888090886528L;
private static final long serialVersionUID = 1L;

public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
this(environment, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* source splits.
*/
public abstract class MultipleTopicTemplateContext extends PulsarTestContext<String> {
private static final long serialVersionUID = 7333807392445848344L;
private static final long serialVersionUID = 1L;

private int numTopics = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* source splits.
*/
public class SingleTopicConsumingContext extends PulsarTestContext<String> {
private static final long serialVersionUID = 2754642285356345741L;
private static final long serialVersionUID = 1L;

private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic";
private final String topicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S
new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));

// Defines a set of external context Factories for different test cases.
@SuppressWarnings("unused")
@TestContext
PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);

@SuppressWarnings("unused")
@TestContext
PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;

/** We would consuming from test splits by using {@link SubscriptionType#Exclusive} subscription. */
/** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */
public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
private static final long serialVersionUID = 6238209089442257487L;
private static final long serialVersionUID = 1L;

public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
this(environment, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;

/** We would consuming from test splits by using {@link SubscriptionType#Failover} subscription. */
/** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
private static final long serialVersionUID = 6238209089442257487L;
private static final long serialVersionUID = 1L;

public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
this(environment, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@
import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
import static org.apache.pulsar.client.api.Schema.STRING;

/**
* We would consuming from test splits by using {@link SubscriptionType#Key_Shared} subscription.
*/
/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
private static final long serialVersionUID = 3246516520107893983L;
private static final long serialVersionUID = 1L;

private int index = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
public class SharedSubscriptionContext extends PulsarTestContext<String> {
private static final long serialVersionUID = -2798707923661295245L;
private static final long serialVersionUID = 1L;

private int index = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public AvroBulkFormatExternalContext createExternalContext(String testName) {
return new AvroBulkFormatExternalContext(blocksPerFile);
}
}

@Override
public String toString() {
return "AvroBulkFormatExternalContext{" + "blocksPerFile=" + blocksPerFile + '}';
}
}

private static class AvroBulkFormatExternalSystemSplitDataWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.testframe.external;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.source.Source;

Expand All @@ -31,6 +32,7 @@
* external system, such as creating instance of {@link Source} and {@link Sink}, generating test
* data, and creating data readers or writers for validating the correctness of test data.
*/
@Experimental
public interface ExternalContext extends AutoCloseable {

/** Get URL of connector JARs that will be attached to job graphs when submitting Flink jobs. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.flink.connector.testframe.external;

import org.apache.flink.annotation.Experimental;

/** Factory for creating {@link ExternalContext}. */
@Experimental
public interface ExternalContextFactory<C extends ExternalContext> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.testframe.external.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.testframe.external.ExternalContext;
Expand All @@ -30,6 +31,7 @@
*
* @param <T> Type of elements before serialization by sink
*/
@Experimental
public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.testframe.external.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.testframe.external.ExternalContext;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.table.data.RowData;
Expand All @@ -31,7 +32,9 @@
* <p>Comparing with {@link DataStreamSinkExternalContext}, the data type of this external context
* is fixed as {@link RowData} to test functionality of table source.
*/
@Experimental
public interface TableSinkExternalContext extends ExternalContext {

/** Get table options for building DDL of the connector sink table. */
Map<String, String> getSinkTableOptions(TestingSinkSettings sinkSettings)
throws UnsupportedOperationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.testframe.external.source;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.testframe.external.ExternalContext;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.table.data.RowData;
Expand All @@ -31,7 +32,9 @@
* <p>Comparing with {@link DataStreamSourceExternalContext}, the data type of this external context
* is fixed as {@link RowData} to test functionality of table source.
*/
@Experimental
public interface TableSourceExternalContext extends ExternalContext {

/** Get table options for building DDL of the connector source table. */
Map<String, String> getSourceTableOptions(TestingSourceSettings sourceSettings)
throws UnsupportedOperationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContex
context.getStore(TEST_RESOURCE_NAMESPACE)
.get(EXTERNAL_CONTEXT_FACTORIES_STORE_KEY);

// Create a invocation context for each external context factory
// Create an invocation context for each external context factory
return externalContextFactories.stream()
.map(
factory ->
Expand Down

0 comments on commit 0dd661f

Please sign in to comment.