Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaCompanion does not configure Serializer/Deserializer #2756

Open
diversit opened this issue Sep 16, 2024 · 3 comments
Open

KafkaCompanion does not configure Serializer/Deserializer #2756

diversit opened this issue Sep 16, 2024 · 3 comments

Comments

@diversit
Copy link
Contributor

diversit commented Sep 16, 2024

The KafkaCompanion does not properly configure a registered Serde, Serializer and/or Deserialiser which causes issues with certain implementations.

E.g. When using Confluents io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde (and it's serializer/deserializer) not properly calling configure causes a test to fail with:

java.lang.AssertionError: Expected a completion event but got a failure: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient

This behaviour can be verified by adding a test case to SerdesTest:

    @Test
    void testRegisteredSerdeShouldBeConfigured() {
        // Custom Serde which Serialize/Deserializer throw exception if not configured
        companion.registerSerde(Person.class, new Serde<>() {

            // Custom Deserializer which throws an exception if not configured
            private PersonDeserializer personDeserializer = new PersonDeserializer() {
                private boolean isConfigured = false;

                @Override
                public void configure(Map<String, ?> configs, boolean isKey) {
                    isConfigured = true;
                }

                @Override
                public Person deserialize(String s, byte[] bytes) {
                    if (isConfigured) {
                        return super.deserialize(s, bytes);
                    } else {
                        throw new SerializationException("Deserializer not configured");
                    }
                }
            };

            // Custom Serializer which throws an exception if not configured
            private PersonSerializer personSerializer = new PersonSerializer() {
                private boolean isConfigured = false;

                @Override
                public void configure(Map<String, ?> configs, boolean isKey) {
                    isConfigured = true;
                }

                @Override
                public byte[] serialize(String s, Person person) {
                    if (isConfigured) {
                        return super.serialize(s, person);
                    } else {
                        throw new SerializationException("Serializer not configured");
                    }
                }
            };

            @Override
            public Serializer<Person> serializer() {
                return personSerializer;
            }

            @Override
            public Deserializer<Person> deserializer() {
                return personDeserializer;
            }

            @Override
            public void configure(Map<String, ?> configs, boolean isKey) {
                // Configure the serializer and deserializer
                personSerializer.configure(configs, isKey);
                personDeserializer.configure(configs, isKey);
            }
        });

        companion.produce(Person.class).fromRecords(
                new ProducerRecord<>(topic, new Person("1", 30)),
                new ProducerRecord<>(topic, new Person("2", 25)),
                new ProducerRecord<>(topic, new Person("3", 18))).awaitCompletion();

        ConsumerBuilder<String, Person> consumer = companion.consumeWithDeserializers(PersonDeserializer.class.getName());
        ConsumerTask<String, Person> task = consumer.fromTopics(topic, 3).awaitCompletion();
        assertThat(task.getRecords()).hasSize(3);
    }

which will currently fail with

java.lang.AssertionError: Expected a completion event but got a failure: org.apache.kafka.common.errors.SerializationException: Serializer not configured

KafkaCompanion should be updated to call configure on the Serde so the Serializer and Deserialiser properly get configured before a message is published or consumed.

Due to this issue, other serialisers like the StringSerializer, UUIDSerializer, ListSerializer, etc could also give unexpected results in tests.

Provided a fix in PR #2757

diversit added a commit to diversit/smallrye-reactive-messaging that referenced this issue Sep 16, 2024
diversit added a commit to diversit/smallrye-reactive-messaging that referenced this issue Sep 16, 2024
…gure the (de)serializers before creating the KafkaConsumer/KafkaProducer instances.
@ozangunalp
Copy link
Collaborator

@diversit Thank you for opening this detailed issue and the PR.

This was intentional, following the typical Kafka client behavior.
If you pass the serde type name, configure will be called by the client constructor. I believe there was a javadoc note on that.

That being said, I think configure could've called the for creators from registered serde. Would that make sense for you ?

@diversit
Copy link
Contributor Author

@ozangunalp Thanks for your reply.

I understand your point. I was in the understanding the SerDe would required configuration provided by the KafaCompanion and I thought the getCommonClientConfig() would not be sufficient.

From the docs registering custom serdes I had not understood I had to configure the SerDe myself.
And I'm having trouble finding the Javadocs for the SmallRye projects since it does not seem to be linked from the SmallRye site or Github repo's.

Could you please point me to the Javadoc note your mentioned?

After better reading the docs of SpecificAvroSerde I know understand I have to configure it myself with the appropriate config, before registering it with the KafkaCompanion.
Next challenge will then be how to get the url from the Schema Registry from Quarkus in dev/test modes.

So I guess this issue can be closed then.

@ozangunalp
Copy link
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants