Skip to content

Commit

Permalink
fix(api): fix NPE due to empty array-value schema for complex type (#53,
Browse files Browse the repository at this point in the history
#54)

This commit also adds auto schema name generation based on field name

Resolves: GH-53 resolves GH-54
  • Loading branch information
fhussonnois committed Aug 6, 2020
1 parent fe53bf4 commit c2fbea2
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ArraySchema implements Schema {
private Integer hash;

/**
* Creates a new MapSchema for the specified type.
* Creates a new ArraySchema for the specified type.
*
* @param valueSchema the {@link Schema} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package io.streamthoughts.kafka.connect.filepulse.data;

import java.util.Collection;
import java.util.List;

public class LazyArraySchema extends ArraySchema implements Schema {

private final List list;
private final Collection<?> list;

private Schema valueSchema;

Expand All @@ -31,7 +32,7 @@ public class LazyArraySchema extends ArraySchema implements Schema {
*
* @param list the {@link List} instance.
*/
LazyArraySchema(final List list) {
LazyArraySchema(final Collection<?> list) {
super(null);
this.list = list;

Expand All @@ -46,7 +47,7 @@ public Schema valueSchema() {
if (list.isEmpty()) {
throw new DataException("Cannot infer value type because LIST is empty");
}
valueSchema = SchemaSupplier.lazy(list.get(0)).get();
valueSchema = SchemaSupplier.lazy(list.iterator().next()).get();
}
return valueSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package io.streamthoughts.kafka.connect.filepulse.data;

import java.util.Collection;
import java.util.Map;

import static io.streamthoughts.kafka.connect.filepulse.data.SimpleSchema.SCHEMA_BOOLEAN;
import static io.streamthoughts.kafka.connect.filepulse.data.SimpleSchema.SCHEMA_BYTES;
import static io.streamthoughts.kafka.connect.filepulse.data.SimpleSchema.SCHEMA_FLOAT_32;
Expand Down Expand Up @@ -128,8 +131,8 @@ static StructSchema struct() {
*
* @return the {@link Schema} instance.
*/
static MapSchema map(final Schema valueSchema) {
return new MapSchema(valueSchema);
static MapSchema map(final Map<String ,?> value, final Schema valueSchema) {
return valueSchema == null ? new LazyMapSchema(value) : new MapSchema(valueSchema);
}

/**
Expand All @@ -139,8 +142,8 @@ static MapSchema map(final Schema valueSchema) {
*
* @return the {@link Schema} instance.
*/
static ArraySchema array(final Schema valueSchema) {
return new ArraySchema(valueSchema);
static ArraySchema array(final Collection<?> value, final Schema valueSchema) {
return valueSchema == null ? new LazyArraySchema(value) : new ArraySchema(valueSchema);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ public <T> T map(final SchemaMapper<T> mapper) {
return mapper.map(this);
}

/**
* {@inheritDoc}
*/
@Override
public <T> T map(final SchemaMapperWithValue<T> mapper, final Object object) {
return mapper.map(this, (TypedStruct)object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static TypedValue struct(final TypedStruct value) {
* @return the new {@link TypedValue} instance.
*/
public static TypedValue map(final Map<String ,?> value, final Type valueType) {
return new TypedValue(Schema.map(Schema.of(valueType)), value);
return new TypedValue(Schema.map(value, Schema.of(valueType)), value);
}

/**
Expand All @@ -150,7 +150,7 @@ public static TypedValue map(final Map<String ,?> value, final Type valueType) {
* @return the new {@link TypedValue} instance.
*/
public static TypedValue array(final Collection<?> value, final Schema valueSchema) {
return new TypedValue(Schema.array(valueSchema), value);
return new TypedValue(Schema.array(value, valueSchema), value);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CommonFilterConfig extends AbstractConfig {
public static final String FILTER_OVERWRITE_DOC = "The fields to overwrite.";

public static final String FILTER_SOURCE_FIELD_CONFIG = "source";
private static final String FILTER_SOURCE_FIELD_DOC = "The input field on which to apply the filter.";
private static final String FILTER_SOURCE_FIELD_DOC = "The input field on which to apply the filter (default: message).";

/**
* Creates a new {@link CommonFilterConfig} instance.
Expand Down Expand Up @@ -100,7 +100,6 @@ public RecordFilterPipeline<FileRecord<TypedStruct>> onFailure() {
for (String alias : filterAliases) {
final String prefix = "filters." + alias + ".";
try {
@SuppressWarnings("unchecked")
final RecordFilter filter = getClass(prefix + "type")
.asSubclass(RecordFilter.class)
.getDeclaredConstructor().newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.streamthoughts.kafka.connect.filepulse.data.SchemaMapperWithValue;
import io.streamthoughts.kafka.connect.filepulse.data.SimpleSchema;
import io.streamthoughts.kafka.connect.filepulse.data.StructSchema;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedField;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
Expand All @@ -34,16 +35,26 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class ConnectSchemaMapper implements SchemaMapper<Schema>, SchemaMapperWithValue<SchemaAndValue> {

private static final Object DEFAULT_NULL_VALUE = null;

public static final ConnectSchemaMapper INSTANCE = new ConnectSchemaMapper();
private static final Pattern REGEX = Pattern.compile("[_\\-.]");

static String normalizeSchemaName(final String name) {
return Arrays
.stream(REGEX.split(name))
.map(it -> it.substring(0, 1).toUpperCase() + it.substring(1))
.collect(Collectors.joining());
}

/**
* {@inheritDoc}
Expand Down Expand Up @@ -84,12 +95,35 @@ public Schema map(final StructSchema schema) {
}

for(final TypedField field : schema) {
sb.field(field.name(), field.schema().map(this)).optional();
final io.streamthoughts.kafka.connect.filepulse.data.Schema fieldSchema = field.schema();
final String fieldName = field.name();
mayUpdateSchemaName(fieldSchema, fieldName);
sb.field(fieldName, fieldSchema.map(this)).optional();
}

return sb.build();
}

private void mayUpdateSchemaName(final io.streamthoughts.kafka.connect.filepulse.data.Schema schema,
final String fieldName) {
if (schema.type() == Type.ARRAY) {
final ArraySchema arraySchema = (ArraySchema)schema;
mayUpdateSchemaName(arraySchema.valueSchema(), fieldName);
}

if (schema.type() == Type.MAP) {
final MapSchema mapSchema = (MapSchema)schema;
mayUpdateSchemaName(mapSchema.valueSchema(), fieldName);
}

if (schema.type() == Type.STRUCT) {
final StructSchema structSchema = (StructSchema)schema;
if (structSchema.name() == null) {
structSchema.name(normalizeSchemaName(fieldName));
}
}
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class JSONFilterConfig extends CommonFilterConfig {

public static final String JSON_TARGET_CONFIG = "target";
public static final String JSON_TARGET_DOC = "The target field to put the parsed JSON value";
public static final String JSON_TARGET_DOC = "The target field to put the parsed JSON value (optional)";

public static final String JSON_SOURCE_CHARSET_CONFIG = "source.charset";
public static final String JSON_SOURCE_CHARSET_DOC = "The charset to be used for reading the source" +
Expand All @@ -53,7 +53,7 @@ public String target() {
}

public Charset charset() {
String name = getString(JSON_TARGET_CONFIG);
String name = getString(JSON_SOURCE_CHARSET_CONFIG);
return name == null ? StandardCharsets.UTF_8 : Charset.forName(name);
}

Expand All @@ -63,9 +63,9 @@ public Set<String> overwrite() {

public static ConfigDef configDef() {
ConfigDef def = CommonFilterConfig.configDef()
.define(JSON_TARGET_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH, JSON_TARGET_DOC)
.define(JSON_SOURCE_CHARSET_CONFIG, ConfigDef.Type.STRING, null,
.define(JSON_TARGET_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH, JSON_TARGET_DOC)
.define(JSON_SOURCE_CHARSET_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.MEDIUM, JSON_SOURCE_CHARSET_DOC);
CommonFilterConfig.withOverwrite(def);
CommonFilterConfig.withSource(def);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package io.streamthoughts.kafka.connect.filepulse.json;

import io.streamthoughts.kafka.connect.filepulse.data.ArraySchema;
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
import io.streamthoughts.kafka.connect.filepulse.data.StructSchema;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
Expand All @@ -26,12 +28,26 @@

import java.util.Arrays;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class DefaultJSONStructConverterTest {

private JSONStructConverter converter = new DefaultJSONStructConverter();

@Test
public void shouldConvertGivenFieldWithArrayOfComplexType() throws Exception {
TypedStruct struct = converter.readJson("{\"field-one\" : [{\"firstName\": \"foo\"}, {\"firstName\": \"bar\"}]}");
Assert.assertNotNull(struct);

StructSchema schema = struct.schema();
assertNotNull(schema.field("field-one"));
Schema fieldSchema = schema.field("field-one").schema();
assertEquals(Type.ARRAY, fieldSchema.type());
Schema arraySchema = ((ArraySchema) fieldSchema).valueSchema();
assertEquals(Type.STRUCT, arraySchema.type());
}

@Test
public void shouldConvertGivenFieldsWithStringType() throws Exception {

Expand Down Expand Up @@ -93,7 +109,6 @@ public void shouldConvertGivenFieldsWithNumberType() throws Exception {
assertEquals(Long.MAX_VALUE, struct.getLong("field-long").longValue());
assertEquals(Double.MAX_VALUE, struct.getDouble("field-double"), 0.0);
assertEquals(Float.MAX_VALUE, struct.getDouble("field-float").floatValue(), 0.0);

}

}

0 comments on commit c2fbea2

Please sign in to comment.