Skip to content

Commit

Permalink
feat: allow to configure value schema using Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Mar 6, 2023
1 parent 19cbee2 commit 18b1a73
Show file tree
Hide file tree
Showing 25 changed files with 1,388 additions and 117 deletions.
4 changes: 4 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<suppress checks="CyclomaticComplexity" files="Schema.java"/>
<suppress checks="CyclomaticComplexity" files="FilePulseSourceTask.java"/>
<suppress checks="LineLength" files="KafkaStateBackingStore.java"/>
<suppress checks="NPathComplexity" files="KafkaStateBackingStore.java"/>
<suppress checks="NPathComplexity" files="DefaultFileRecordsPollingConsumer.java"/>
<suppress checks="NPathComplexity" files="DefaultFileSystemMonitor.java"/>
<suppress checks="ParameterNumber" files="InternalFilterContext" />
Expand All @@ -65,4 +66,7 @@
<!-- Classes used for configuration -->
<suppress checks="(MethodLength|LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files=".*Config.java"/>
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files="SchemaMerger.java"/>

<suppress checks="NPathComplexity" files="AbstracConnectSchemaConverter.java"/>
<suppress checks="JavaNCSS" files="ConfigSchema.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class StructSchema implements Schema, Iterable<TypedField> {

Expand Down Expand Up @@ -107,7 +106,9 @@ public TypedField field(final String fieldName) {
}

public List<TypedField> fields() {
return new ArrayList<>(fields.values());
ArrayList<TypedField> ordered = new ArrayList<>(fields.values());
ordered.sort(Comparator.comparing(TypedField::name));
return ordered;
}

void set(final String fieldName, final Schema fieldSchema) {
Expand Down Expand Up @@ -151,11 +152,7 @@ TypedField remove(final String fieldName) {
*/
@Override
public Iterator<TypedField> iterator() {
return this.fields.values()
.stream()
.sorted(Comparator.comparing(TypedField::name))
.collect(Collectors.toUnmodifiableList())
.iterator();
return fields().iterator();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.internal;
import java.util.Objects;

public class Pair<K, V> {
private final K key;
private final V value;

public Pair(K key, V value) {
this.key = key;
this.value = value;
}

public K getKey() {
return key;
}

public V getValue() {
return value;
}

/** {@inheritDoc} **/
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Pair<?, ?> pair = (Pair<?, ?>) o;
return Objects.equals(key, pair.key) && Objects.equals(value, pair.value);
}

/** {@inheritDoc} **/
@Override
public int hashCode() {
return Objects.hash(key, value);
}

/** {@inheritDoc} **/
@Override
public String toString() {
return "Pair{"
+ "key=" + key
+ ", value=" + value
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ private static Schema mergeStruct(final Schema left,
final Schema right,
final SchemaContext context) {

if (!Objects.equals(left.name(), right.name()))
throw new DataException("Cannot merge two schemas wih different name " + left.name() + "<>" + right.name());
if (left.name() != null && right.name() != null) {
if (!Objects.equals(left.name(), right.name()))
throw new DataException("Cannot merge two schemas wih different name " + left.name() + "<>" + right.name());
}

final SchemaBuilder merged = mergeMetadata(left, right, new SchemaBuilder(Type.STRUCT));

Expand Down Expand Up @@ -169,10 +171,11 @@ private static Schema mergeStruct(final Schema left,
// Remaining fields only exist on LEFT schema.
fieldSchemas.putAll(remaining);

// Fields should be added ordered by name to make schema merge operation as idempotent as possible.
// Fields should be added ordered by name to make
// schema merge operation as idempotent as possible.
fieldSchemas.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(it -> merged.field(it.getKey(), context.buildSchemaWithCyclicSchemaWrapper(it.getValue())));
.forEachOrdered(it -> merged.field(it.getKey(), context.buildSchemaWithCyclicSchemaWrapper(it.getValue())));

return context.buildSchemaWithCyclicSchemaWrapper(merged.build());
}
Expand All @@ -181,8 +184,8 @@ private static SchemaBuilder mergeMetadata(final Schema left,
final Schema right,
final SchemaBuilder merged) {

merged.name(left.name());
merged.doc(left.doc());
merged.name(left.name() != null ? left.name() : right.name());
merged.doc(left.doc() != null ? left.doc() : right.doc());

if (left.isOptional() || right.isOptional()) {
merged.optional();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ public SchemaAndValue map(final Schema connectSchema, final TypedStruct value) {

private static Struct toConnectStruct(final Schema connectSchema, final TypedStruct struct) {
final Struct connectStruct = new Struct(connectSchema);
for (Field connectField : connectSchema.fields()) {

for (Field connectField : connectSchema.fields()) {
final String recordName = connectSchema.name();
final String fieldName = connectField.name();

Expand Down
10 changes: 10 additions & 0 deletions connect-file-pulse-dataformat/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
<artifactId>kafka-connect-filepulse-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.jsoniter</groupId>
<artifactId>jsoniter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.avro;

import io.streamthoughts.kafka.connect.filepulse.avro.internal.ConnectSchemaConverter;
import io.streamthoughts.kafka.connect.filepulse.avro.internal.ConnectSchemaConverters;
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
import org.apache.kafka.connect.data.Schema;

import java.util.HashMap;
import java.util.Map;

/**
* Utilities for converting between Connect runtime data format and Avro.
*/
public final class AvroSchemaConverter {

private final Map<org.apache.avro.Schema, Schema> toConnectSchemaCache;

/**
* Creates a new {@link AvroSchemaConverter} instance.
*/
public AvroSchemaConverter() {
toConnectSchemaCache = new HashMap<>();
}

/**
* Convert the given {@link org.apache.avro.Schema} into connect one.
*
* @param schema the string avro schema to be converted.
* @return {@link Schema}.
*/
public Schema toConnectSchema(final String schema) {
if (StringUtils.isBlank(schema)) {
return null;
}
org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
return toConnectSchema(parser.parse(schema));
}

/**
* Convert the given {@link org.apache.avro.Schema} into connect one.
*
* @param schema the avro schema to be converted.
* @return {@link Schema}.
*/
public Schema toConnectSchema(final org.apache.avro.Schema schema) {
Schema cachedSchema = toConnectSchemaCache.get(schema);
if (cachedSchema != null) {
return cachedSchema;
}

ConnectSchemaConverter converter = ConnectSchemaConverters.forType(schema.getType());
org.apache.kafka.connect.data.Schema resultSchema = converter.toConnectSchema(
schema,
new ConnectSchemaConverter.Options().forceOptional(false),
new ConnectSchemaConverter.CyclicContext()
);
toConnectSchemaCache.put(schema, resultSchema);
return resultSchema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.avro;

import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;

public class UnsupportedAvroTypeException extends ConnectFilePulseException {

public UnsupportedAvroTypeException(final String message) {
super(message);
}

}
Loading

0 comments on commit 18b1a73

Please sign in to comment.