From 785ca5914d7f1a462cf87b1661d9a76af0110234 Mon Sep 17 00:00:00 2001 From: Danny Cranmer Date: Mon, 20 Dec 2021 13:55:42 +0000 Subject: [PATCH] [FLINK-25107][formats/gsr] Migrate tests to use Assert4J --- .../GlueSchemaRegistryAvroKinesisITCase.java | 8 +- .../GlueSchemaRegistryJsonKinesisITCase.java | 22 ++---- ...RegistryAvroDeserializationSchemaTest.java | 24 +++--- ...GlueSchemaRegistryAvroSchemaCoderTest.java | 32 ++------ ...maRegistryAvroSerializationSchemaTest.java | 52 ++++++------- ...maRegistryInputStreamDeserializerTest.java | 77 +++++++------------ ...emaRegistryOutputStreamSerializerTest.java | 22 +++--- ...RegistryJsonDeserializationSchemaTest.java | 40 ++++------ ...GlueSchemaRegistryJsonSchemaCoderTest.java | 10 +-- ...maRegistryJsonSerializationSchemaTest.java | 58 +++++++------- 10 files changed, 136 insertions(+), 209 deletions(-) diff --git a/flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java b/flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java index 356c896e19855..3a41abc2cd7c5 100644 --- a/flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java +++ b/flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java @@ -35,7 +35,6 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.After; -import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.ClassRule; @@ -60,6 +59,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION; +import static org.assertj.core.api.Assertions.assertThat; /** End-to-end test for Glue Schema Registry AVRO format using Kinesalite. */ public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger { @@ -133,11 +133,7 @@ public void testGSRAvroGenericFormatWithFlink() throws Exception { } log.info("results: {}", results); - Assert.assertEquals( - "Results received from '" + OUTPUT_STREAM + "': " + results, - messages.size(), - results.size()); - Assert.assertTrue(messages.containsAll(results)); + assertThat(results).containsExactlyInAnyOrderElementsOf(messages); } private FlinkKinesisConsumer createSource() throws Exception { diff --git a/flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java b/flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java index aaa04235b66ad..c9336b9eeb246 100644 --- a/flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java +++ b/flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java @@ -32,7 +32,6 @@ import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import org.junit.After; -import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.ClassRule; @@ -54,6 +53,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION; +import static org.assertj.core.api.Assertions.assertThat; /** End-to-end test for Glue Schema Registry Json format using Kinesalite. */ public class GlueSchemaRegistryJsonKinesisITCase extends TestLogger { @@ -128,11 +128,7 @@ public void testGSRJsonGenericFormatWithFlink() throws Exception { } log.info("results: {}", results); - Assert.assertEquals( - "Results received from '" + OUTPUT_STREAM + "': " + results, - messages.size(), - results.size()); - Assert.assertTrue(messages.containsAll(results)); + assertThat(results).containsExactlyInAnyOrderElementsOf(messages); } private FlinkKinesisConsumer createSource() { @@ -141,13 +137,11 @@ private FlinkKinesisConsumer createSource() { STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name()); - FlinkKinesisConsumer consumer = - new FlinkKinesisConsumer<>( - INPUT_STREAM, - new GlueSchemaRegistryJsonDeserializationSchema<>( - JsonDataWithSchema.class, INPUT_STREAM, getConfigs()), - properties); - return consumer; + return new FlinkKinesisConsumer<>( + INPUT_STREAM, + new GlueSchemaRegistryJsonDeserializationSchema<>( + JsonDataWithSchema.class, INPUT_STREAM, getConfigs()), + properties); } private FlinkKinesisProducer createSink() throws Exception { @@ -208,7 +202,7 @@ private List getGenericRecords() { } private Map getConfigs() { - Map configs = new HashMap(); + Map configs = new HashMap<>(); configs.put(AWSSchemaRegistryConstants.AWS_REGION, "ca-central-1"); configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); diff --git a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroDeserializationSchemaTest.java index d5bb2a18ed3a0..ae233c655243a 100644 --- a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroDeserializationSchemaTest.java +++ b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroDeserializationSchemaTest.java @@ -30,9 +30,7 @@ import java.util.HashMap; import java.util.Map; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryAvroDeserializationSchema}. */ public class GlueSchemaRegistryAvroDeserializationSchemaTest extends TestLogger { @@ -53,22 +51,18 @@ public static void setup() throws IOException { /** Test whether forGeneric method works. */ @Test public void testForGeneric_withValidParams_succeeds() { - assertThat( - GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs), - notNullValue()); - assertThat( - GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs), - instanceOf(GlueSchemaRegistryAvroDeserializationSchema.class)); + assertThat(GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs)) + .isNotNull(); + assertThat(GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs)) + .isInstanceOf(GlueSchemaRegistryAvroDeserializationSchema.class); } /** Test whether forSpecific method works. */ @Test public void testForSpecific_withValidParams_succeeds() { - assertThat( - GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs), - notNullValue()); - assertThat( - GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs), - instanceOf(GlueSchemaRegistryAvroDeserializationSchema.class)); + assertThat(GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs)) + .isNotNull(); + assertThat(GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs)) + .isInstanceOf(GlueSchemaRegistryAvroDeserializationSchema.class); } } diff --git a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java index 911bc5df99937..128d82ab3bc64 100644 --- a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java +++ b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java @@ -28,7 +28,6 @@ import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import lombok.NonNull; import org.apache.avro.Schema; -import org.hamcrest.Matchers; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -48,9 +47,7 @@ import java.util.Map; import java.util.UUID; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryAvroSchemaCoder}. */ public class GlueSchemaRegistryAvroSchemaCoderTest extends TestLogger { @@ -102,14 +99,10 @@ public static void setup() throws IOException { /** Test whether constructor works. */ @Test public void testConstructor_withConfigs_succeeds() { - assertThat(new GlueSchemaRegistryAvroSchemaCoder(testTopic, configs), notNullValue()); + assertThat(new GlueSchemaRegistryAvroSchemaCoder(testTopic, configs)).isNotNull(); } - /** - * Test whether readSchema method works. - * - * @throws IOException - */ + /** Test whether readSchema method works. */ @Test public void testReadSchema_withValidParams_succeeds() throws IOException { GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder = @@ -117,14 +110,10 @@ public void testReadSchema_withValidParams_succeeds() throws IOException { Schema resultSchema = glueSchemaRegistryAvroSchemaCoder.readSchema(buildByteArrayInputStream()); - assertThat(resultSchema, equalTo(userSchema)); + assertThat(resultSchema).isEqualTo(userSchema); } - /** - * Test whether writeSchema method works. - * - * @throws IOException - */ + /** Test whether writeSchema method works. */ @Test public void testWriteSchema_withValidParams_succeeds() throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); @@ -136,12 +125,7 @@ public void testWriteSchema_withValidParams_succeeds() throws IOException { testForSerializedData(outputStream.toByteArray()); } - /** - * Test whether writeSchema method throws exception if auto registration un-enabled. - * - * @throws NoSuchFieldException - * @throws IllegalAccessException - */ + /** Test whether writeSchema method throws exception if auto registration un-enabled. */ @Test public void testWriteSchema_withoutAutoRegistration_throwsException() throws IOException { configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, false); @@ -167,12 +151,12 @@ public void testWriteSchema_withoutAutoRegistration_throwsException() throws IOE } private void testForSerializedData(byte[] serializedData) { - assertThat(serializedData, Matchers.notNullValue()); + assertThat(serializedData).isNotNull(); ByteBuffer buffer = getByteBuffer(serializedData); byte headerVersionByte = getByte(buffer); - assertThat(headerVersionByte, equalTo(AWSSchemaRegistryConstants.HEADER_VERSION_BYTE)); + assertThat(headerVersionByte).isEqualTo(AWSSchemaRegistryConstants.HEADER_VERSION_BYTE); } private ByteArrayInputStream buildByteArrayInputStream() { diff --git a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSerializationSchemaTest.java b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSerializationSchemaTest.java index 766c35240200a..f5048cde3eb35 100644 --- a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSerializationSchemaTest.java +++ b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSerializationSchemaTest.java @@ -34,11 +34,7 @@ import java.util.HashMap; import java.util.Map; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryAvroSerializationSchema}. */ public class GlueSchemaRegistryAvroSerializationSchemaTest extends TestLogger { @@ -52,10 +48,10 @@ public class GlueSchemaRegistryAvroSerializationSchemaTest extends TestLogger { }; private static Schema userSchema; private static User userDefinedPojo; - private static Map configs = new HashMap<>(); - private static Map metadata = new HashMap<>(); + private static final Map configs = new HashMap<>(); + private static final Map metadata = new HashMap<>(); private static GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration; - private static AwsCredentialsProvider credentialsProvider = + private static final AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder().build(); private static GlueSchemaRegistrySerializationFacade mockSerializationFacade; @@ -87,26 +83,26 @@ public static void setup() throws IOException { @Test public void testForGeneric_withValidParams_succeeds() { assertThat( - GlueSchemaRegistryAvroSerializationSchema.forGeneric( - userSchema, testTopic, configs), - notNullValue()); + GlueSchemaRegistryAvroSerializationSchema.forGeneric( + userSchema, testTopic, configs)) + .isNotNull(); assertThat( - GlueSchemaRegistryAvroSerializationSchema.forGeneric( - userSchema, testTopic, configs), - instanceOf(GlueSchemaRegistryAvroSerializationSchema.class)); + GlueSchemaRegistryAvroSerializationSchema.forGeneric( + userSchema, testTopic, configs)) + .isInstanceOf(GlueSchemaRegistryAvroSerializationSchema.class); } /** Test whether forSpecific method works. */ @Test public void testForSpecific_withValidParams_succeeds() { assertThat( - GlueSchemaRegistryAvroSerializationSchema.forSpecific( - User.class, testTopic, configs), - notNullValue()); + GlueSchemaRegistryAvroSerializationSchema.forSpecific( + User.class, testTopic, configs)) + .isNotNull(); assertThat( - GlueSchemaRegistryAvroSerializationSchema.forSpecific( - User.class, testTopic, configs), - instanceOf(GlueSchemaRegistryAvroSerializationSchema.class)); + GlueSchemaRegistryAvroSerializationSchema.forSpecific( + User.class, testTopic, configs)) + .isInstanceOf(GlueSchemaRegistryAvroSerializationSchema.class); } /** Test whether serialize method when compression is not enabled works. */ @@ -121,13 +117,13 @@ public void testSerialize_withValidParams_withoutCompression_succeeds() { testTopic, configs, mockSerializationFacade); GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder = new GlueSchemaRegistryAvroSchemaCoder(glueSchemaRegistryOutputStreamSerializer); - GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema = - new GlueSchemaRegistryAvroSerializationSchema( + GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema = + new GlueSchemaRegistryAvroSerializationSchema<>( User.class, null, glueSchemaRegistryAvroSchemaCoder); byte[] serializedData = glueSchemaRegistryAvroSerializationSchema.serialize(userDefinedPojo); - assertThat(serializedData, equalTo(specificBytes)); + assertThat(serializedData).isEqualTo(specificBytes); } /** Test whether serialize method when compression is enabled works. */ @@ -142,22 +138,22 @@ public void testSerialize_withValidParams_withCompression_succeeds() { testTopic, configs, mockSerializationFacade); GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder = new GlueSchemaRegistryAvroSchemaCoder(glueSchemaRegistryOutputStreamSerializer); - GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema = - new GlueSchemaRegistryAvroSerializationSchema( + GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema = + new GlueSchemaRegistryAvroSerializationSchema<>( User.class, null, glueSchemaRegistryAvroSchemaCoder); byte[] serializedData = glueSchemaRegistryAvroSerializationSchema.serialize(userDefinedPojo); - assertThat(serializedData, equalTo(specificBytes)); + assertThat(serializedData).isEqualTo(specificBytes); } /** Test whether serialize method returns null when input object is null. */ @Test public void testSerialize_withNullObject_returnNull() { - GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema = + GlueSchemaRegistryAvroSerializationSchema glueSchemaRegistryAvroSerializationSchema = GlueSchemaRegistryAvroSerializationSchema.forSpecific( User.class, testTopic, configs); - assertThat(glueSchemaRegistryAvroSerializationSchema.serialize(null), nullValue()); + assertThat(glueSchemaRegistryAvroSerializationSchema.serialize(null)).isNull(); } private static class MockGlueSchemaRegistrySerializationFacade diff --git a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java index dd0079530bd8e..f16ebb1204521 100644 --- a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java +++ b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java @@ -26,6 +26,7 @@ import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade; import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION; import lombok.NonNull; import org.apache.avro.Schema; import org.apache.avro.io.BinaryEncoder; @@ -49,11 +50,11 @@ import java.util.Map; import java.util.UUID; +import static com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION.NONE; +import static com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION_DEFAULT_BYTE; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryInputStreamDeserializer}. */ public class GlueSchemaRegistryInputStreamDeserializerTest extends TestLogger { @@ -64,10 +65,10 @@ public class GlueSchemaRegistryInputStreamDeserializerTest extends TestLogger { private static Schema userSchema; private static com.amazonaws.services.schemaregistry.common.Schema glueSchema; private static User userDefinedPojo; - private static Map configs = new HashMap<>(); - private static Map metadata = new HashMap<>(); + private static final Map configs = new HashMap<>(); + private static final Map metadata = new HashMap<>(); private static GlueSchemaRegistryCompressionHandler compressionHandler; - private static AwsCredentialsProvider credentialsProvider = + private static final AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder().build(); @Rule public ExpectedException thrown = ExpectedException.none(); private GlueSchemaRegistryDeserializationFacade glueSchemaRegistryDeserializationFacade; @@ -99,9 +100,8 @@ public void setup() throws IOException { public void testConstructor_withConfigs_succeeds() { GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer = new GlueSchemaRegistryInputStreamDeserializer(configs); - assertThat( - glueSchemaRegistryInputStreamDeserializer, - instanceOf(GlueSchemaRegistryInputStreamDeserializer.class)); + assertThat(glueSchemaRegistryInputStreamDeserializer) + .isInstanceOf(GlueSchemaRegistryInputStreamDeserializer.class); } @Test @@ -115,7 +115,7 @@ public void testDefaultAwsCredentialsProvider() throws Exception { glueSchemaRegistryInputStreamDeserializer); AwsCredentialsProvider credentialsProvider = facade.getCredentialsProvider(); - assertThat(credentialsProvider, instanceOf(DefaultCredentialsProvider.class)); + assertThat(credentialsProvider).isInstanceOf(DefaultCredentialsProvider.class); } @Test @@ -133,8 +133,8 @@ public void testAwsCredentialsProviderFromConfig() throws Exception { glueSchemaRegistryInputStreamDeserializer); AwsCredentialsProvider credentialsProvider = facade.getCredentialsProvider(); - assertThat(credentialsProvider.resolveCredentials().accessKeyId(), equalTo("ak")); - assertThat(credentialsProvider.resolveCredentials().secretAccessKey(), equalTo("sk")); + assertThat(credentialsProvider.resolveCredentials().accessKeyId()).isEqualTo("ak"); + assertThat(credentialsProvider.resolveCredentials().secretAccessKey()).isEqualTo("sk"); } /** Test whether constructor works with AWS de-serializer input. */ @@ -143,21 +143,15 @@ public void testConstructor_withDeserializer_succeeds() { GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer = new GlueSchemaRegistryInputStreamDeserializer( glueSchemaRegistryDeserializationFacade); - assertThat( - glueSchemaRegistryInputStreamDeserializer, - instanceOf(GlueSchemaRegistryInputStreamDeserializer.class)); + assertThat(glueSchemaRegistryInputStreamDeserializer) + .isInstanceOf(GlueSchemaRegistryInputStreamDeserializer.class); } /** Test whether getSchemaAndDeserializedStream method when compression is not enabled works. */ @Test public void testGetSchemaAndDeserializedStream_withoutCompression_succeeds() throws IOException { - AWSSchemaRegistryConstants.COMPRESSION compressionType = - AWSSchemaRegistryConstants.COMPRESSION.NONE; - compressionByte = - compressionType.equals(AWSSchemaRegistryConstants.COMPRESSION.NONE) - ? AWSSchemaRegistryConstants.COMPRESSION_DEFAULT_BYTE - : AWSSchemaRegistryConstants.COMPRESSION_BYTE; + compressionByte = COMPRESSION_DEFAULT_BYTE; compressionHandler = new GlueSchemaRegistryDefaultCompression(); ByteArrayOutputStream byteArrayOutputStream = @@ -166,17 +160,12 @@ public void testGetSchemaAndDeserializedStream_withoutCompression_succeeds() byte[] bytes = writeToExistingStream( byteArrayOutputStream, - compressionType.equals(AWSSchemaRegistryConstants.COMPRESSION.NONE) - ? encodeData(userDefinedPojo, new SpecificDatumWriter<>(userSchema)) - : compressData( - encodeData( - userDefinedPojo, - new SpecificDatumWriter<>(userSchema)))); + encodeData(userDefinedPojo, new SpecificDatumWriter<>(userSchema))); MutableByteArrayInputStream mutableByteArrayInputStream = new MutableByteArrayInputStream(); mutableByteArrayInputStream.setBuffer(bytes); glueSchemaRegistryDeserializationFacade = - new MockGlueSchemaRegistryDeserializationFacade(bytes, glueSchema, compressionType); + new MockGlueSchemaRegistryDeserializationFacade(bytes, glueSchema, NONE); GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer = new GlueSchemaRegistryInputStreamDeserializer( @@ -185,18 +174,14 @@ public void testGetSchemaAndDeserializedStream_withoutCompression_succeeds() glueSchemaRegistryInputStreamDeserializer.getSchemaAndDeserializedStream( mutableByteArrayInputStream); - assertThat(resultSchema.toString(), equalTo(glueSchema.getSchemaDefinition())); + assertThat(resultSchema.toString()).isEqualTo(glueSchema.getSchemaDefinition()); } /** Test whether getSchemaAndDeserializedStream method when compression is enabled works. */ @Test public void testGetSchemaAndDeserializedStream_withCompression_succeeds() throws IOException { - AWSSchemaRegistryConstants.COMPRESSION compressionType = - AWSSchemaRegistryConstants.COMPRESSION.ZLIB; - compressionByte = - compressionType.equals(AWSSchemaRegistryConstants.COMPRESSION.NONE) - ? AWSSchemaRegistryConstants.COMPRESSION_DEFAULT_BYTE - : AWSSchemaRegistryConstants.COMPRESSION_BYTE; + COMPRESSION compressionType = COMPRESSION.ZLIB; + compressionByte = AWSSchemaRegistryConstants.COMPRESSION_BYTE; compressionHandler = new GlueSchemaRegistryDefaultCompression(); ByteArrayOutputStream byteArrayOutputStream = @@ -205,12 +190,9 @@ public void testGetSchemaAndDeserializedStream_withCompression_succeeds() throws byte[] bytes = writeToExistingStream( byteArrayOutputStream, - compressionType.equals(AWSSchemaRegistryConstants.COMPRESSION.NONE) - ? encodeData(userDefinedPojo, new SpecificDatumWriter<>(userSchema)) - : compressData( - encodeData( - userDefinedPojo, - new SpecificDatumWriter<>(userSchema)))); + compressData( + encodeData( + userDefinedPojo, new SpecificDatumWriter<>(userSchema)))); MutableByteArrayInputStream mutableByteArrayInputStream = new MutableByteArrayInputStream(); mutableByteArrayInputStream.setBuffer(bytes); @@ -224,7 +206,7 @@ public void testGetSchemaAndDeserializedStream_withCompression_succeeds() throws glueSchemaRegistryInputStreamDeserializer.getSchemaAndDeserializedStream( mutableByteArrayInputStream); - assertThat(resultSchema.toString(), equalTo(glueSchema.getSchemaDefinition())); + assertThat(resultSchema.toString()).isEqualTo(glueSchema.getSchemaDefinition()); } /** Test whether getSchemaAndDeserializedStream method throws exception with invalid schema. */ @@ -248,8 +230,7 @@ public void testGetSchemaAndDeserializedStream_withWrongSchema_throwsException() new com.amazonaws.services.schemaregistry.common.Schema( schemaDefinition, DataFormat.AVRO.name(), testTopic); glueSchemaRegistryDeserializationFacade = - new MockGlueSchemaRegistryDeserializationFacade( - new byte[20], glueSchema, AWSSchemaRegistryConstants.COMPRESSION.NONE); + new MockGlueSchemaRegistryDeserializationFacade(new byte[20], glueSchema, NONE); GlueSchemaRegistryInputStreamDeserializer awsSchemaRegistryInputStreamDeserializer = new GlueSchemaRegistryInputStreamDeserializer( glueSchemaRegistryDeserializationFacade); @@ -299,14 +280,14 @@ private byte[] compressData(byte[] actualDataBytes) throws IOException { private static class MockGlueSchemaRegistryDeserializationFacade extends GlueSchemaRegistryDeserializationFacade { - private byte[] bytes; - private com.amazonaws.services.schemaregistry.common.Schema schema; - private AWSSchemaRegistryConstants.COMPRESSION compressionType; + private final byte[] bytes; + private final com.amazonaws.services.schemaregistry.common.Schema schema; + private final COMPRESSION compressionType; public MockGlueSchemaRegistryDeserializationFacade( byte[] bytes, com.amazonaws.services.schemaregistry.common.Schema schema, - AWSSchemaRegistryConstants.COMPRESSION compressionType) { + COMPRESSION compressionType) { super(configs, null, credentialsProvider, null); this.bytes = bytes; this.schema = schema; diff --git a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryOutputStreamSerializerTest.java b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryOutputStreamSerializerTest.java index 1f8423bd75b83..8a9fa5ac75d76 100644 --- a/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryOutputStreamSerializerTest.java +++ b/flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryOutputStreamSerializerTest.java @@ -35,9 +35,7 @@ import java.util.HashMap; import java.util.Map; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryOutputStreamSerializer}. */ public class GlueSchemaRegistryOutputStreamSerializerTest extends TestLogger { @@ -52,9 +50,9 @@ public class GlueSchemaRegistryOutputStreamSerializerTest extends TestLogger { }; private static Schema userSchema; private static User userDefinedPojo; - private static Map configs = new HashMap<>(); - private static Map metadata = new HashMap<>(); - private static AwsCredentialsProvider credentialsProvider = + private static final Map configs = new HashMap<>(); + private static final Map metadata = new HashMap<>(); + private static final AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder().build(); private static GlueSchemaRegistrySerializationFacade mockSerializationFacade; @@ -87,9 +85,8 @@ public static void setup() throws IOException { public void testConstructor_withConfigsAndCredential_succeeds() { GlueSchemaRegistryOutputStreamSerializer glueSchemaRegistryOutputStreamSerializer = new GlueSchemaRegistryOutputStreamSerializer(testTopic, configs); - assertThat( - glueSchemaRegistryOutputStreamSerializer, - instanceOf(GlueSchemaRegistryOutputStreamSerializer.class)); + assertThat(glueSchemaRegistryOutputStreamSerializer) + .isInstanceOf(GlueSchemaRegistryOutputStreamSerializer.class); } /** Test whether constructor works with Glue Schema Registry SerializationFacade. */ @@ -98,9 +95,8 @@ public void testConstructor_withDeserializer_succeeds() { GlueSchemaRegistryOutputStreamSerializer glueSchemaRegistryOutputStreamSerializer = new GlueSchemaRegistryOutputStreamSerializer( testTopic, configs, mockSerializationFacade); - assertThat( - glueSchemaRegistryOutputStreamSerializer, - instanceOf(GlueSchemaRegistryOutputStreamSerializer.class)); + assertThat(glueSchemaRegistryOutputStreamSerializer) + .isInstanceOf(GlueSchemaRegistryOutputStreamSerializer.class); } /** Test whether registerSchemaAndSerializeStream method works. */ @@ -113,7 +109,7 @@ public void testRegisterSchemaAndSerializeStream_withValidParams_succeeds() thro glueSchemaRegistryOutputStreamSerializer.registerSchemaAndSerializeStream( userSchema, outputStream, actualBytes); - assertThat(outputStream.toByteArray(), equalTo(specificBytes)); + assertThat(outputStream.toByteArray()).isEqualTo(specificBytes); } private static class MockGlueSchemaRegistrySerializationFacade diff --git a/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java b/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java index ef5e066260c8e..d796bf0b1eb53 100644 --- a/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java +++ b/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java @@ -37,17 +37,13 @@ import java.util.HashMap; import java.util.Map; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryJsonDeserializationSchema}. */ public class GlueSchemaRegistryJsonDeserializationSchemaTest { private static final String testTopic = "Test-Topic"; private static final Map configs = new HashMap<>(); - private static AwsCredentialsProvider credentialsProvider = + private static final AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder().build(); private static final byte[] serializedBytes = new byte[] { @@ -100,24 +96,22 @@ public static void setup() { @Test public void testForGeneric_withValidParams_succeeds() { assertThat( - new GlueSchemaRegistryJsonDeserializationSchema<>( - JsonDataWithSchema.class, testTopic, configs), - notNullValue()); + new GlueSchemaRegistryJsonDeserializationSchema<>( + JsonDataWithSchema.class, testTopic, configs)) + .isNotNull(); assertThat( - new GlueSchemaRegistryJsonDeserializationSchema<>( - JsonDataWithSchema.class, testTopic, configs), - instanceOf(GlueSchemaRegistryJsonDeserializationSchema.class)); + new GlueSchemaRegistryJsonDeserializationSchema<>( + JsonDataWithSchema.class, testTopic, configs)) + .isInstanceOf(GlueSchemaRegistryJsonDeserializationSchema.class); } /** Test initialization for specific type JSON Schema works. */ @Test public void testForSpecific_withValidParams_succeeds() { - assertThat( - new GlueSchemaRegistryJsonDeserializationSchema<>(Car.class, testTopic, configs), - notNullValue()); - assertThat( - new GlueSchemaRegistryJsonDeserializationSchema<>(Car.class, testTopic, configs), - instanceOf(GlueSchemaRegistryJsonDeserializationSchema.class)); + assertThat(new GlueSchemaRegistryJsonDeserializationSchema<>(Car.class, testTopic, configs)) + .isNotNull(); + assertThat(new GlueSchemaRegistryJsonDeserializationSchema<>(Car.class, testTopic, configs)) + .isInstanceOf(GlueSchemaRegistryJsonDeserializationSchema.class); } /** Test whether deserialize method for specific type JSON Schema data works. */ @@ -134,8 +128,8 @@ public void testDeserializePOJO_withValidParams_succeeds() { Object deserializedObject = glueSchemaRegistryJsonDeserializationSchema.deserialize(serializedBytes); - assertThat(deserializedObject, instanceOf(Car.class)); - assertThat(deserializedObject, is(userDefinedPojo)); + assertThat(deserializedObject).isInstanceOf(Car.class); + assertThat(deserializedObject).isEqualTo(userDefinedPojo); } /** Test whether deserialize method for generic type JSON Schema data works. */ @@ -152,8 +146,8 @@ public void testDeserializeGenericData_withValidParams_succeeds() { Object deserializedObject = glueSchemaRegistryJsonDeserializationSchema.deserialize(serializedBytes); - assertThat(deserializedObject, instanceOf(JsonDataWithSchema.class)); - assertThat(deserializedObject, is(userSchema)); + assertThat(deserializedObject).isInstanceOf(JsonDataWithSchema.class); + assertThat(deserializedObject).isEqualTo(userSchema); } /** Test whether deserialize method returns null when input byte array is null. */ @@ -163,7 +157,7 @@ public void testDeserialize_withNullObject_returnNull() { glueSchemaRegistryJsonDeserializationSchema = new GlueSchemaRegistryJsonDeserializationSchema<>( Car.class, testTopic, configs); - assertThat(glueSchemaRegistryJsonDeserializationSchema.deserialize(null), nullValue()); + assertThat(glueSchemaRegistryJsonDeserializationSchema.deserialize(null)).isNull(); } private static class MockGlueSchemaRegistrySerializationFacadeForSpecific diff --git a/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderTest.java b/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderTest.java index ab226a598971a..220198678c6ea 100644 --- a/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderTest.java +++ b/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderTest.java @@ -30,9 +30,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryJsonSchemaCoder}. */ public class GlueSchemaRegistryJsonSchemaCoderTest { @@ -46,7 +44,7 @@ public void testDefaultAwsCredentialsProvider() throws Exception { getField("glueSchemaRegistryDeserializationFacade", coder); AwsCredentialsProvider credentialsProvider = facade.getCredentialsProvider(); - assertThat(credentialsProvider, instanceOf(DefaultCredentialsProvider.class)); + assertThat(credentialsProvider).isInstanceOf(DefaultCredentialsProvider.class); } @Test @@ -62,8 +60,8 @@ public void testAwsCredentialsProviderFromConfig() throws Exception { getField("glueSchemaRegistryDeserializationFacade", coder); AwsCredentialsProvider credentialsProvider = facade.getCredentialsProvider(); - assertThat(credentialsProvider.resolveCredentials().accessKeyId(), equalTo("ak")); - assertThat(credentialsProvider.resolveCredentials().secretAccessKey(), equalTo("sk")); + assertThat(credentialsProvider.resolveCredentials().accessKeyId()).isEqualTo("ak"); + assertThat(credentialsProvider.resolveCredentials().secretAccessKey()).isEqualTo("sk"); } private T getField(final String fieldName, final Object instance) throws Exception { diff --git a/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSerializationSchemaTest.java b/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSerializationSchemaTest.java index 8c4b1bba4f155..6d07e7039bf33 100644 --- a/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSerializationSchemaTest.java +++ b/flink-formats/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSerializationSchemaTest.java @@ -39,11 +39,8 @@ import java.util.Map; import java.util.UUID; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; +import static com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION.NONE; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link GlueSchemaRegistryJsonSerializationSchema}. */ public class GlueSchemaRegistryJsonSerializationSchemaTest { @@ -67,10 +64,10 @@ public class GlueSchemaRegistryJsonSerializationSchemaTest { private static final UUID schemaVersionId = UUID.randomUUID(); private static JsonDataWithSchema userSchema; private static Car userDefinedPojo; - private static Map configs = new HashMap<>(); - private static Map metadata = new HashMap<>(); + private static final Map configs = new HashMap<>(); + private static final Map metadata = new HashMap<>(); private static GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration; - private static AwsCredentialsProvider credentialsProvider = + private static final AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder().build(); private static GlueSchemaRegistrySerializationFacade mockSerializationFacade; @@ -107,12 +104,9 @@ public static void setup() { /** Test initialization works. */ @Test public void testForGeneric_withValidParams_succeeds() { - assertThat( - new GlueSchemaRegistryJsonSerializationSchema<>(testTopic, configs), - notNullValue()); - assertThat( - new GlueSchemaRegistryJsonSerializationSchema<>(testTopic, configs), - instanceOf(GlueSchemaRegistryJsonSerializationSchema.class)); + assertThat(new GlueSchemaRegistryJsonSerializationSchema<>(testTopic, configs)).isNotNull(); + assertThat(new GlueSchemaRegistryJsonSerializationSchema<>(testTopic, configs)) + .isInstanceOf(GlueSchemaRegistryJsonSerializationSchema.class); } /** @@ -121,20 +115,18 @@ public void testForGeneric_withValidParams_succeeds() { */ @Test public void testSerializePOJO_withValidParams_withoutCompression_succeeds() { - AWSSchemaRegistryConstants.COMPRESSION compressionType = - AWSSchemaRegistryConstants.COMPRESSION.NONE; - configs.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, compressionType.name()); + configs.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, NONE.name()); GlueSchemaRegistryJsonSchemaCoder glueSchemaRegistryJsonSchemaCoder = new GlueSchemaRegistryJsonSchemaCoder( testTopic, configs, mockSerializationFacade, null); - GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = + GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = new GlueSchemaRegistryJsonSerializationSchema<>(glueSchemaRegistryJsonSchemaCoder); byte[] serializedData = glueSchemaRegistryJsonSerializationSchema.serialize(userDefinedPojo); - assertThat(serializedData, equalTo(serializedBytes)); + assertThat(serializedData).isEqualTo(serializedBytes); } /** @@ -143,19 +135,19 @@ public void testSerializePOJO_withValidParams_withoutCompression_succeeds() { */ @Test public void testSerializeGenericData_withValidParams_withoutCompression_succeeds() { - AWSSchemaRegistryConstants.COMPRESSION compressionType = - AWSSchemaRegistryConstants.COMPRESSION.NONE; - configs.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, compressionType.name()); + configs.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, NONE.name()); GlueSchemaRegistryJsonSchemaCoder glueSchemaRegistryJsonSchemaCoder = new GlueSchemaRegistryJsonSchemaCoder( testTopic, configs, mockSerializationFacade, null); - GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = - new GlueSchemaRegistryJsonSerializationSchema<>(glueSchemaRegistryJsonSchemaCoder); + GlueSchemaRegistryJsonSerializationSchema + glueSchemaRegistryJsonSerializationSchema = + new GlueSchemaRegistryJsonSerializationSchema<>( + glueSchemaRegistryJsonSchemaCoder); byte[] serializedData = glueSchemaRegistryJsonSerializationSchema.serialize(userSchema); - assertThat(serializedData, equalTo(serializedBytes)); + assertThat(serializedData).isEqualTo(serializedBytes); } /** @@ -172,12 +164,12 @@ public void testSerializePOJO_withValidParams_withCompression_succeeds() { new GlueSchemaRegistryJsonSchemaCoder( testTopic, configs, mockSerializationFacade, null); - GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = + GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = new GlueSchemaRegistryJsonSerializationSchema<>(glueSchemaRegistryJsonSchemaCoder); byte[] serializedData = glueSchemaRegistryJsonSerializationSchema.serialize(userDefinedPojo); - assertThat(serializedData, equalTo(serializedBytes)); + assertThat(serializedData).isEqualTo(serializedBytes); } /** @@ -194,19 +186,21 @@ public void testSerializeGenericData_withValidParams_withCompression_succeeds() new GlueSchemaRegistryJsonSchemaCoder( testTopic, configs, mockSerializationFacade, null); - GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = - new GlueSchemaRegistryJsonSerializationSchema<>(glueSchemaRegistryJsonSchemaCoder); + GlueSchemaRegistryJsonSerializationSchema + glueSchemaRegistryJsonSerializationSchema = + new GlueSchemaRegistryJsonSerializationSchema<>( + glueSchemaRegistryJsonSchemaCoder); byte[] serializedData = glueSchemaRegistryJsonSerializationSchema.serialize(userSchema); - assertThat(serializedData, equalTo(serializedBytes)); + assertThat(serializedData).isEqualTo(serializedBytes); } /** Test whether serialize method returns null when input object is null. */ @Test public void testSerialize_withNullObject_returnNull() { - GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = + GlueSchemaRegistryJsonSerializationSchema glueSchemaRegistryJsonSerializationSchema = new GlueSchemaRegistryJsonSerializationSchema<>(testTopic, configs); - assertThat(glueSchemaRegistryJsonSerializationSchema.serialize(null), nullValue()); + assertThat(glueSchemaRegistryJsonSerializationSchema.serialize(null)).isNull(); } private static class MockGlueSchemaRegistrySerializationFacade