Skip to content

Commit

Permalink
fix(filter): add support for Arrays to JSONFilter (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Aug 10, 2020
1 parent 3f05bb9 commit 5cea1e2
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public String name() {
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TypedField)) return false;
TypedField typeField = (TypedField) o;
return index == typeField.index &&
schema == typeField.schema &&
Objects.equals(name, typeField.name);
TypedField that = (TypedField) o;
return index == that.index &&
Objects.equals(schema, that.schema) &&
Objects.equals(name, that.name);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,20 @@

public class JSONFilterConfig extends CommonFilterConfig {

public static final String JSON_TARGET_CONFIG = "target";
public static final String JSON_TARGET_DOC = "The target field to put the parsed JSON value (optional)";
public static final String JSON_TARGET_CONFIG = "target";
public static final String JSON_TARGET_DOC = "The target field to put the parsed JSON value (optional)";

public static final String JSON_MERGE_CONFIG = "merge";
public static final String JSON_MERGE_DOC = "A boolean that specifies whether to merge the JSON " +
"object into the top level of the input record (default: false).";

public static final String JSON_EXPLODE_ARRAY_CONFIG = "explode.array";
public static final String JSON_EXPLODE_ARRAY_DOC = "A boolean that specifies whether to explode arrays " +
" into separate records (default: false)";

public static final String JSON_SOURCE_CHARSET_CONFIG = "source.charset";
public static final String JSON_SOURCE_CHARSET_DOC = "The charset to be used for reading the source" +
" field (default: UTF-8)";
public static final String JSON_SOURCE_CHARSET_DOC = "The charset to be used for reading the source " +
" field (default: UTF-8)";

/**
* Creates a new {@link JSONFilterConfig} instance.
Expand All @@ -52,6 +60,14 @@ public String target() {
return getString(JSON_TARGET_CONFIG);
}

public boolean explode() {
return getBoolean(JSON_EXPLODE_ARRAY_CONFIG);
}

public boolean merge() {
return getBoolean(JSON_MERGE_CONFIG);
}

public Charset charset() {
String name = getString(JSON_SOURCE_CHARSET_CONFIG);
return name == null ? StandardCharsets.UTF_8 : Charset.forName(name);
Expand All @@ -66,7 +82,11 @@ public static ConfigDef configDef() {
.define(JSON_TARGET_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH, JSON_TARGET_DOC)
.define(JSON_SOURCE_CHARSET_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.MEDIUM, JSON_SOURCE_CHARSET_DOC);
ConfigDef.Importance.MEDIUM, JSON_SOURCE_CHARSET_DOC)
.define(JSON_EXPLODE_ARRAY_CONFIG, ConfigDef.Type.BOOLEAN, false,
ConfigDef.Importance.MEDIUM, JSON_EXPLODE_ARRAY_DOC)
.define(JSON_MERGE_CONFIG, ConfigDef.Type.BOOLEAN, false,
ConfigDef.Importance.MEDIUM, JSON_MERGE_DOC);
CommonFilterConfig.withOverwrite(def);
CommonFilterConfig.withSource(def);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,34 @@
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.JSONFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.ArraySchema;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.json.DefaultJSONStructConverter;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import org.apache.kafka.common.config.ConfigDef;

import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class JSONFilter extends AbstractMergeRecordFilter<JSONFilter> {

private final DefaultJSONStructConverter converter = new DefaultJSONStructConverter();

private JSONFilterConfig configs;

private String source;

private String target;

private Charset charset;

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> props) {
super.configure(props);
configs = new JSONFilterConfig(props);
source = configs.source();
target = configs.target();
charset = configs.charset();
}

/**
Expand All @@ -66,44 +62,95 @@ public ConfigDef configDef() {
*/
@Override
protected RecordsIterable<TypedStruct> apply(final FilterContext context, final TypedStruct record) {
final String value = extractJsonField(checkIsNotNull(record.get(source)));
final String value = extractJsonField(checkIsNotNull(record.get(configs.source())));
final TypedValue typedValue;

try {
final TypedStruct json = converter.readJson(value);
if (target != null) {
record.put(target, json);
return RecordsIterable.of(record);
}
return RecordsIterable.of(json);
typedValue = converter.readJson(value);
} catch (Exception e) {
throw new FilterException(e.getLocalizedMessage(), e.getCause());
}

final Type type = typedValue.type();

if (type != Type.ARRAY && type != Type.STRUCT) {
throw new FilterException(
"Cannot process JSON value with unsupported type. Expected Array or Object, was " + type);
}

if (type == Type.STRUCT && configs.merge()) {
return RecordsIterable.of(typedValue.getStruct());
}

if (type == Type.ARRAY) {
if (configs.explode()) {

Collection<?> items = typedValue.getArray();
ArraySchema arraySchema = (ArraySchema)typedValue.schema();
Type arrayValueType = arraySchema.valueSchema().type();

if (configs.merge()) {
if (arrayValueType == Type.STRUCT) {
final List<TypedStruct> records = items
.stream()
.map(it -> TypedValue.any(it).getStruct())
.collect(Collectors.toList());
return new RecordsIterable<>(records);
}

throw new FilterException(
"Unsupported operation. Cannot merge array value of type '"
+ arrayValueType + "' into the top level of the input record");
}

final List<TypedStruct> records = items
.stream()
.map(it -> TypedStruct.create().put(targetField(), TypedValue.of(it, arrayValueType)))
.collect(Collectors.toList());
return new RecordsIterable<>(records);
}

if (configs.merge()) {
throw new FilterException(
"Unsupported operation. Cannot merge JSON Array into the top level of the input record");
}
}

return RecordsIterable.of(TypedStruct.create().put(targetField(), typedValue));
}

private String extractJsonField(final TypedValue value) {
switch (value.type()) {
case STRING:
return value.getString();
case BYTES:
return new String(value.getBytes(), charset);
return new String(value.getBytes(), configs.charset());
default:
throw new FilterException(
"Invalid field '" + source + "', cannot parse JSON field of type '" + value.type() + "'");
"Invalid field '" + configs.source() + "', cannot parse JSON field of type '" + value.type() + "'"
);
}
}

private TypedValue checkIsNotNull(final TypedValue value) {
if (value.isNull()) {
throw new FilterException(
"Invalid field '" + source + "', cannot convert empty value to JSON");
"Invalid field '" + configs.source() + "', cannot convert empty value to JSON");
}
return value;
}

private String targetField() {
return configs.target() != null ? configs.target() : configs.source();
}

/**
* {@inheritDoc}
*/
@Override
protected Set<String> overwrite() {
return configs.overwrite();
return configs.target() == null && !configs.merge() ?
Collections.singleton(configs.source())
: configs.overwrite() ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@
public class DefaultJSONStructConverter implements JSONStructConverter {

private static final Map<ValueType, JsonFieldAccessor<?>> ACCESSORS = new HashMap<>();
private static final ObjectJsonFieldAccessor DEFAULT_ACCESSOR = new ObjectJsonFieldAccessor();

/**
* Creates a new {@link DefaultJSONStructConverter} instance.
*/
public DefaultJSONStructConverter() {
ACCESSORS.put(ValueType.ARRAY, new ArrayJsonFieldAccessor());
ACCESSORS.put(ValueType.STRING, new StringJsonFieldAccessor());
ACCESSORS.put(ValueType.OBJECT, DEFAULT_ACCESSOR);
ACCESSORS.put(ValueType.OBJECT, new ObjectJsonFieldAccessor());
ACCESSORS.put(ValueType.BOOLEAN, new BooleanJsonFieldAccessor());
ACCESSORS.put(ValueType.NUMBER, new NumberJsonFieldAccessor());
}
Expand All @@ -65,15 +64,13 @@ private static JsonFieldAccessor<?> getAccessorForType(final ValueType type) {
* {@inheritDoc}
*/
@Override
public TypedStruct readJson(final String data) {
public TypedValue readJson(final String data) {

if (data == null) {
return null;
}
if (data == null) return null;

try {
JsonIterator it = JsonIterator.parse(data);
return DEFAULT_ACCESSOR.read(it).getStruct();
return getAccessorForType(it.whatIsNext()).read(it);

} catch (Exception e) {
throw new ReaderException("Error while reading json value, invalid JSON message.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.streamthoughts.kafka.connect.filepulse.json;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;

/**
* Default interface to manage conversion from input JSON message to {@link TypedStruct} object.
Expand All @@ -31,5 +32,5 @@ public interface JSONStructConverter {
* @param data the json message to convert to {@link TypedStruct}.
* @return the new {@link TypedStruct} instance.
*/
TypedStruct readJson(final String data) throws Exception;
TypedValue readJson(final String data) throws Exception;
}
Loading

0 comments on commit 5cea1e2

Please sign in to comment.