Skip to content

Commit

Permalink
fix(filter): fix ConvertFilter (#61)
Browse files Browse the repository at this point in the history
Resolves: #61
  • Loading branch information
fhussonnois committed Aug 13, 2020
1 parent b0c6d7c commit c3997c7
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ public TypedStruct put(final String field, final Schema schema, final Object obj
this.schema.field(field, schema);
values.add(object);
} else {

int index = this.schema.indexOf(field);
this.schema.set(field, schema); // handle case where field's schema is changed.
values.set(index, object);
Expand Down Expand Up @@ -320,6 +319,51 @@ public <K, V> Map<K, V> getMap(final String field) throws DataException {
return getCheckedType(field, Type.MAP);
}

public TypedValue find(final String path) {
if (has(path)) return get(path);

if (isDotPropertyAccessPath(path)) {
String[] split = path.split("\\.", 2);
if (has(split[0])) {
TypedValue child = get(split[0]);
if (child.schema().type() == Type.STRUCT) {
return child.getStruct().find(split[1]);
}
}
}
return null;
}

public TypedStruct insert(final String path, final Object value) {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Cannot insert value given null or empty path");
}
doInsert(path, (value instanceof TypedValue) ? (TypedValue)value : TypedValue.any(value));
return this;
}

private void doInsert(final String path, final TypedValue value) {
if (isDotPropertyAccessPath(path)) {
String[] split = path.split("\\.", 2);
final String field = split[0];
final String remaining = split[1];
TypedStruct child;
if (has(field)) {
child = getStruct(field);
} else {
child = new TypedStruct();
put(field, child);
}
child.doInsert(remaining, value);
} else {
put(path, value);
}
}

private static boolean isDotPropertyAccessPath(final String name) {
return name.contains(".");
}

public TypedValue first(final String fieldName) {
TypedField field = field(fieldName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package io.streamthoughts.kafka.connect.filepulse.data;

import org.junit.Assert;
import org.junit.Test;

import javax.validation.constraints.AssertTrue;

import static io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.*;
import static org.junit.Assert.*;

public class TypedStructTest {
Expand All @@ -36,13 +40,13 @@ public class TypedStructTest {

@Test(expected = DataException.class)
public void shouldThrowExceptionGivenInvalidFieldName() {
TypedStruct struct = TypedStruct.create();
TypedStruct struct = create();
struct.get(STRING_FIELD_1);
}

@Test
public void shouldReturnFieldPreviouslyAdded() {
TypedStruct struct = TypedStruct.create()
TypedStruct struct = create()
.put(STRING_FIELD_1, STRING_VALUE_1);

TypedValue typed = struct.get(STRING_FIELD_1);
Expand All @@ -53,7 +57,7 @@ public void shouldReturnFieldPreviouslyAdded() {

@Test
public void shouldIncrementIndexWhilePuttingNewFields() {
TypedStruct struct = TypedStruct.create()
TypedStruct struct = create()
.put(STRING_FIELD_1, STRING_VALUE_1)
.put(STRING_FIELD_2, STRING_VALUE_2);

Expand All @@ -63,7 +67,7 @@ public void shouldIncrementIndexWhilePuttingNewFields() {

@Test
public void shouldRemoveAndReIndexFieldsGivenValidFieldName() {
final TypedStruct struct = TypedStruct.create()
final TypedStruct struct = create()
.put(STRING_FIELD_1, STRING_VALUE_1)
.put(STRING_FIELD_2, STRING_FIELD_2)
.put(STRING_FIELD_3, STRING_VALUE_3)
Expand All @@ -80,12 +84,36 @@ public void shouldRemoveAndReIndexFieldsGivenValidFieldName() {

@Test
public void shouldRenameGivenValidFieldName() {
final TypedStruct struct = TypedStruct.create()
final TypedStruct struct = create()
.put(STRING_FIELD_1, STRING_VALUE_1);

struct.rename(STRING_FIELD_1, STRING_FIELD_2);

assertFalse(struct.has(STRING_FIELD_1));
assertTrue(struct.has(STRING_FIELD_2));
}

@Test
public void shouldReturnValueWhenUsingFindGivenValidPath() {
TypedStruct struct = create().put("foo", create().put("bar", "value"));
Assert.assertEquals("value", struct.find("foo.bar").getString());
}

@Test
public void shouldReturnNullWhenUsingFindGivenInvalidPath() {
TypedStruct struct = create().put("foo", create().put("bar", "value"));
Assert.assertNull(struct.find("foo.foo"));
}

@Test
public void shouldInsertValueGivenValidPath() {
TypedStruct struct = create()
.insert("first.child", "v1")
.insert("foo", "v2");

Assert.assertEquals("v1", struct.getStruct("first").getString("child"));
Assert.assertEquals("v2", struct.getString("foo"));


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public Object readValue(final EvaluationContext context) {
return readValue(context, Object.class);
}

public String getRootObject() {
return rootObject;
}

public String getAttribute() {
return attribute;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,11 @@ public TypedValue read(final EvaluationContext context,
Objects.requireNonNull(target, "target cannot be null");
Objects.requireNonNull(name, "name cannot be null");

final TypedStruct struct = (TypedStruct)target;

if (struct.has(name)) {
return struct.get(name);
}

if (isDotPropertyAccessPath(name)) {
String[] split = name.split("\\.", 2);
Object rootObject = read(context, target, split[0]);
if (rootObject != null) {
return read(context, rootObject, split[1]);
}
final TypedValue value = ((TypedStruct) target).find(name);
if (value == null) {
throw new AccessException("Cannot access to field '" + name + "'");
}

throw new AccessException("Cannot access to field '" + name + "'");
}

private boolean isDotPropertyAccessPath(final String name) {
return name.contains(".");
return value;
}

/**
Expand All @@ -87,27 +73,7 @@ public void write(final EvaluationContext context,

Objects.requireNonNull(target, "target cannot be null");
Objects.requireNonNull(name, "name cannot be null");

final TypedStruct struct = (TypedStruct)target;

if (isDotPropertyAccessPath(name)) {
String[] split = name.split("\\.", 2);
final String field = split[0];
final String remaining = split[1];

TypedStruct child;
if (struct.has(field)) {
child = struct.getStruct(field);
} else {
child = TypedStruct.create();
}
write(context, child, remaining, newValue);

} else if (newValue instanceof TypedValue) {
struct.put(name, (TypedValue)newValue);
} else {
struct.put(name, TypedValue.any(newValue));
}
((TypedStruct)target).insert(name, newValue);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@
package io.streamthoughts.kafka.connect.filepulse.config;

import io.streamthoughts.kafka.connect.filepulse.data.Type;
import org.apache.kafka.common.config.AbstractConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

public class ConvertFilterConfig extends AbstractConfig {
public class ConvertFilterConfig extends CommonFilterConfig {

public static final String CONVERT_FIELD_CONFIG = "field";
private static final String CONVERT_FIELD_DOC = "The field to convert";
public static final String CONVERT_FIELD_CONFIG = "field";
private static final String CONVERT_FIELD_DOC = "The field to convert (dot notation is supported)";

public static final String CONVERT_TYPE_CONFIG = "type";
private static final String CONVERT_TYPE_DOC = "The type field must be converted to";
public static final String CONVERT_TO_CONFIG = "to";
private static final String CONVERT_TO_DOC = "The type to which the field must be converted";

public static final String CONVERT_DEFAULT_CONFIG = "default";
private static final String CONVERT_DEFAULT_DOC = "The default value to apply if the field cannot be converted";

public static final String CONVERT_IGNORE_MISSING_CONFIG = "ignoreMissing";
private static final String CONVERT_IGNORE_MISSING_DOC = "If true and field does not exist the filter will be apply successfully without modifying the value. If field is null the schema will be modified.";
Expand All @@ -48,8 +51,13 @@ public String field() {
return getString(CONVERT_FIELD_CONFIG);
}

public Type type() {
return Type.valueOf(getString(CONVERT_TYPE_CONFIG).toUpperCase());
public Type to() {
return Type.valueOf(getString(CONVERT_TO_CONFIG).toUpperCase());
}

public TypedValue defaultValue() {
String defaultValue = getString(CONVERT_DEFAULT_CONFIG);
return defaultValue != null ? TypedValue.any(defaultValue).as(to()) : null;
}

public boolean ignoreMissing() {
Expand All @@ -58,13 +66,16 @@ public boolean ignoreMissing() {

public static ConfigDef configDef() {
return CommonFilterConfig.configDef()
.define(CONVERT_FIELD_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, CONVERT_FIELD_DOC)
.define(CONVERT_FIELD_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, CONVERT_FIELD_DOC)

.define(CONVERT_TO_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, CONVERT_TO_DOC)

.define(CONVERT_TYPE_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, CONVERT_TYPE_DOC)
.define(CONVERT_DEFAULT_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH, CONVERT_DEFAULT_DOC)

.define(CONVERT_IGNORE_MISSING_CONFIG, ConfigDef.Type.BOOLEAN, true,
ConfigDef.Importance.HIGH, CONVERT_IGNORE_MISSING_DOC);
.define(CONVERT_IGNORE_MISSING_CONFIG, ConfigDef.Type.BOOLEAN, true,
ConfigDef.Importance.HIGH, CONVERT_IGNORE_MISSING_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.Map;

import static io.streamthoughts.kafka.connect.filepulse.config.ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG;

public class ConvertFilter extends AbstractRecordFilter<ConvertFilter> {

private ConvertFilterConfig config;
Expand Down Expand Up @@ -55,14 +57,25 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
final TypedStruct record,
final boolean hasNext) throws FilterException {

if (record.has(config.field())) {
TypedValue value = record.get(config.field());
TypedValue converted = value.as(config.type());

record.put(config.field(), converted);
final String fieldName = config.field();
final TypedValue value = record.find(fieldName);
if (value != null) {
try {
TypedValue converted = value.as(config.to());
record.insert(fieldName, converted);
} catch (Exception e) {
if (config.defaultValue() == null) {
throw new FilterException(
"Fail to convert field '" + fieldName + "' to type " + config.to() + ": " + e.getMessage()
);
}
record.insert(fieldName, config.defaultValue());
}

} else if (!config.ignoreMissing()) {
throw new FilterException("Cannot find field with name '" + config.field() + "'");
throw new FilterException(
"Cannot find field for name '" + fieldName + "' (" + CONVERT_IGNORE_MISSING_CONFIG+ "=false)"
);
}

return RecordsIterable.of(record);
Expand Down
Loading

0 comments on commit c3997c7

Please sign in to comment.