Skip to content

Commit

Permalink
[FLINK-25107][formats/gsr] Migrate tests to use Assert4J
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycranmer committed Jan 11, 2022
1 parent 46e1e60 commit 785ca59
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
Expand All @@ -60,6 +59,7 @@
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.assertj.core.api.Assertions.assertThat;

/** End-to-end test for Glue Schema Registry AVRO format using Kinesalite. */
public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger {
Expand Down Expand Up @@ -133,11 +133,7 @@ public void testGSRAvroGenericFormatWithFlink() throws Exception {
}
log.info("results: {}", results);

Assert.assertEquals(
"Results received from '" + OUTPUT_STREAM + "': " + results,
messages.size(),
results.size());
Assert.assertTrue(messages.containsAll(results));
assertThat(results).containsExactlyInAnyOrderElementsOf(messages);
}

private FlinkKinesisConsumer<GenericRecord> createSource() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
Expand All @@ -54,6 +53,7 @@
import java.util.concurrent.TimeUnit;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.assertj.core.api.Assertions.assertThat;

/** End-to-end test for Glue Schema Registry Json format using Kinesalite. */
public class GlueSchemaRegistryJsonKinesisITCase extends TestLogger {
Expand Down Expand Up @@ -128,11 +128,7 @@ public void testGSRJsonGenericFormatWithFlink() throws Exception {
}
log.info("results: {}", results);

Assert.assertEquals(
"Results received from '" + OUTPUT_STREAM + "': " + results,
messages.size(),
results.size());
Assert.assertTrue(messages.containsAll(results));
assertThat(results).containsExactlyInAnyOrderElementsOf(messages);
}

private FlinkKinesisConsumer<JsonDataWithSchema> createSource() {
Expand All @@ -141,13 +137,11 @@ private FlinkKinesisConsumer<JsonDataWithSchema> createSource() {
STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());

FlinkKinesisConsumer<JsonDataWithSchema> consumer =
new FlinkKinesisConsumer<>(
INPUT_STREAM,
new GlueSchemaRegistryJsonDeserializationSchema<>(
JsonDataWithSchema.class, INPUT_STREAM, getConfigs()),
properties);
return consumer;
return new FlinkKinesisConsumer<>(
INPUT_STREAM,
new GlueSchemaRegistryJsonDeserializationSchema<>(
JsonDataWithSchema.class, INPUT_STREAM, getConfigs()),
properties);
}

private FlinkKinesisProducer<JsonDataWithSchema> createSink() throws Exception {
Expand Down Expand Up @@ -208,7 +202,7 @@ private List<JsonDataWithSchema> getGenericRecords() {
}

private Map<String, Object> getConfigs() {
Map<String, Object> configs = new HashMap();
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "ca-central-1");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link GlueSchemaRegistryAvroDeserializationSchema}. */
public class GlueSchemaRegistryAvroDeserializationSchemaTest extends TestLogger {
Expand All @@ -53,22 +51,18 @@ public static void setup() throws IOException {
/** Test whether forGeneric method works. */
@Test
public void testForGeneric_withValidParams_succeeds() {
assertThat(
GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs),
notNullValue());
assertThat(
GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs),
instanceOf(GlueSchemaRegistryAvroDeserializationSchema.class));
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs))
.isNotNull();
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs))
.isInstanceOf(GlueSchemaRegistryAvroDeserializationSchema.class);
}

/** Test whether forSpecific method works. */
@Test
public void testForSpecific_withValidParams_succeeds() {
assertThat(
GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs),
notNullValue());
assertThat(
GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs),
instanceOf(GlueSchemaRegistryAvroDeserializationSchema.class));
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs))
.isNotNull();
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs))
.isInstanceOf(GlueSchemaRegistryAvroDeserializationSchema.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -48,9 +47,7 @@
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link GlueSchemaRegistryAvroSchemaCoder}. */
public class GlueSchemaRegistryAvroSchemaCoderTest extends TestLogger {
Expand Down Expand Up @@ -102,29 +99,21 @@ public static void setup() throws IOException {
/** Test whether constructor works. */
@Test
public void testConstructor_withConfigs_succeeds() {
assertThat(new GlueSchemaRegistryAvroSchemaCoder(testTopic, configs), notNullValue());
assertThat(new GlueSchemaRegistryAvroSchemaCoder(testTopic, configs)).isNotNull();
}

/**
* Test whether readSchema method works.
*
* @throws IOException
*/
/** Test whether readSchema method works. */
@Test
public void testReadSchema_withValidParams_succeeds() throws IOException {
GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder =
new GlueSchemaRegistryAvroSchemaCoder(mockInputStreamDeserializer);
Schema resultSchema =
glueSchemaRegistryAvroSchemaCoder.readSchema(buildByteArrayInputStream());

assertThat(resultSchema, equalTo(userSchema));
assertThat(resultSchema).isEqualTo(userSchema);
}

/**
* Test whether writeSchema method works.
*
* @throws IOException
*/
/** Test whether writeSchema method works. */
@Test
public void testWriteSchema_withValidParams_succeeds() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Expand All @@ -136,12 +125,7 @@ public void testWriteSchema_withValidParams_succeeds() throws IOException {
testForSerializedData(outputStream.toByteArray());
}

/**
* Test whether writeSchema method throws exception if auto registration un-enabled.
*
* @throws NoSuchFieldException
* @throws IllegalAccessException
*/
/** Test whether writeSchema method throws exception if auto registration un-enabled. */
@Test
public void testWriteSchema_withoutAutoRegistration_throwsException() throws IOException {
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, false);
Expand All @@ -167,12 +151,12 @@ public void testWriteSchema_withoutAutoRegistration_throwsException() throws IOE
}

private void testForSerializedData(byte[] serializedData) {
assertThat(serializedData, Matchers.notNullValue());
assertThat(serializedData).isNotNull();

ByteBuffer buffer = getByteBuffer(serializedData);
byte headerVersionByte = getByte(buffer);

assertThat(headerVersionByte, equalTo(AWSSchemaRegistryConstants.HEADER_VERSION_BYTE));
assertThat(headerVersionByte).isEqualTo(AWSSchemaRegistryConstants.HEADER_VERSION_BYTE);
}

private ByteArrayInputStream buildByteArrayInputStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link GlueSchemaRegistryAvroSerializationSchema}. */
public class GlueSchemaRegistryAvroSerializationSchemaTest extends TestLogger {
Expand All @@ -52,10 +48,10 @@ public class GlueSchemaRegistryAvroSerializationSchemaTest extends TestLogger {
};
private static Schema userSchema;
private static User userDefinedPojo;
private static Map<String, Object> configs = new HashMap<>();
private static Map<String, String> metadata = new HashMap<>();
private static final Map<String, Object> configs = new HashMap<>();
private static final Map<String, String> metadata = new HashMap<>();
private static GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration;
private static AwsCredentialsProvider credentialsProvider =
private static final AwsCredentialsProvider credentialsProvider =
DefaultCredentialsProvider.builder().build();
private static GlueSchemaRegistrySerializationFacade mockSerializationFacade;

Expand Down Expand Up @@ -87,26 +83,26 @@ public static void setup() throws IOException {
@Test
public void testForGeneric_withValidParams_succeeds() {
assertThat(
GlueSchemaRegistryAvroSerializationSchema.forGeneric(
userSchema, testTopic, configs),
notNullValue());
GlueSchemaRegistryAvroSerializationSchema.forGeneric(
userSchema, testTopic, configs))
.isNotNull();
assertThat(
GlueSchemaRegistryAvroSerializationSchema.forGeneric(
userSchema, testTopic, configs),
instanceOf(GlueSchemaRegistryAvroSerializationSchema.class));
GlueSchemaRegistryAvroSerializationSchema.forGeneric(
userSchema, testTopic, configs))
.isInstanceOf(GlueSchemaRegistryAvroSerializationSchema.class);
}

/** Test whether forSpecific method works. */
@Test
public void testForSpecific_withValidParams_succeeds() {
assertThat(
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs),
notNullValue());
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs))
.isNotNull();
assertThat(
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs),
instanceOf(GlueSchemaRegistryAvroSerializationSchema.class));
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs))
.isInstanceOf(GlueSchemaRegistryAvroSerializationSchema.class);
}

/** Test whether serialize method when compression is not enabled works. */
Expand All @@ -121,13 +117,13 @@ public void testSerialize_withValidParams_withoutCompression_succeeds() {
testTopic, configs, mockSerializationFacade);
GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder =
new GlueSchemaRegistryAvroSchemaCoder(glueSchemaRegistryOutputStreamSerializer);
GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema =
new GlueSchemaRegistryAvroSerializationSchema(
GlueSchemaRegistryAvroSerializationSchema<User> glueSchemaRegistryAvroSerializationSchema =
new GlueSchemaRegistryAvroSerializationSchema<>(
User.class, null, glueSchemaRegistryAvroSchemaCoder);

byte[] serializedData =
glueSchemaRegistryAvroSerializationSchema.serialize(userDefinedPojo);
assertThat(serializedData, equalTo(specificBytes));
assertThat(serializedData).isEqualTo(specificBytes);
}

/** Test whether serialize method when compression is enabled works. */
Expand All @@ -142,22 +138,22 @@ public void testSerialize_withValidParams_withCompression_succeeds() {
testTopic, configs, mockSerializationFacade);
GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder =
new GlueSchemaRegistryAvroSchemaCoder(glueSchemaRegistryOutputStreamSerializer);
GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema =
new GlueSchemaRegistryAvroSerializationSchema(
GlueSchemaRegistryAvroSerializationSchema<User> glueSchemaRegistryAvroSerializationSchema =
new GlueSchemaRegistryAvroSerializationSchema<>(
User.class, null, glueSchemaRegistryAvroSchemaCoder);

byte[] serializedData =
glueSchemaRegistryAvroSerializationSchema.serialize(userDefinedPojo);
assertThat(serializedData, equalTo(specificBytes));
assertThat(serializedData).isEqualTo(specificBytes);
}

/** Test whether serialize method returns null when input object is null. */
@Test
public void testSerialize_withNullObject_returnNull() {
GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema =
GlueSchemaRegistryAvroSerializationSchema<User> glueSchemaRegistryAvroSerializationSchema =
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs);
assertThat(glueSchemaRegistryAvroSerializationSchema.serialize(null), nullValue());
assertThat(glueSchemaRegistryAvroSerializationSchema.serialize(null)).isNull();
}

private static class MockGlueSchemaRegistrySerializationFacade
Expand Down
Loading

0 comments on commit 785ca59

Please sign in to comment.