Skip to content

Commit

Permalink
feat(filters): add new filter ExplodeFilter (#52)
Browse files Browse the repository at this point in the history
This commit adds a new filter to explode an array
or list field into separate records.

Resolves: GH-52
  • Loading branch information
fhussonnois committed Aug 6, 2020
1 parent 636fd38 commit 9e90720
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.kafka.connect.filepulse.config;

import io.streamthoughts.kafka.connect.filepulse.filter.config.CommonFilterConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

public class ExplodeFilterConfig extends CommonFilterConfig {

/**
* Creates a new {@link ExplodeFilterConfig} instance.
*
* @param originals the filter configuration.
*/
public ExplodeFilterConfig(final Map<?, ?> originals) {
super(configDef(), originals);
}

public String source() {
return getString(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG);
}

public static ConfigDef configDef() {
ConfigDef def = CommonFilterConfig.configDef();
CommonFilterConfig.withOverwrite(def);
CommonFilterConfig.withSource(def);
return def;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.ExplodeFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import org.apache.kafka.common.config.ConfigDef;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@link ExplodeFilter} explodes an array or list field into separate records.
*/
public class ExplodeFilter extends AbstractMergeRecordFilter<ExplodeFilter> {

private ExplodeFilterConfig config;

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> props) {
super.configure(props);
config = new ExplodeFilterConfig(props);
}

/**
* {@inheritDoc}
*/
@Override
public ConfigDef configDef() {
return ExplodeFilterConfig.configDef();
}


/**
* {@inheritDoc}
*/
@Override
protected RecordsIterable<TypedStruct> apply(final FilterContext context, final TypedStruct record) throws FilterException {
TypedValue value = checkIsNotNull(record.get(config.source()));
if (value.type() != Type.ARRAY) {
throw new FilterException(
"Invalid type for field '" + config.source() + "', expected ARRAY, was " + value.type());
}

final List<TypedStruct> explode = value.getArray()
.stream()
.map(it -> TypedStruct.create().put(config.source(), TypedValue.any(it)))
.collect(Collectors.toList());

return new RecordsIterable<>(explode);
}

/**
* {@inheritDoc}
*/
@Override
protected Set<String> overwrite() {
Set<String> overwrite = new HashSet<>();
overwrite.add(config.source());
return overwrite;
}

private TypedValue checkIsNotNull(final TypedValue value) {
if (value.isNull()) {
throw new FilterException("Invalid field '" + config.source() + "', cannot explode empty value");
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.filter.config.CommonFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.create;

public class ExplodeFilterTest {

private static final List<String> PRIMITIVE_VALUES;
private static final List<TypedStruct> COMPLEXES_VALUES;

static {
PRIMITIVE_VALUES = Arrays.asList("one", "two", "three", "four");
COMPLEXES_VALUES = PRIMITIVE_VALUES.stream()
.map(it -> TypedStruct.create().put("field", it))
.collect(Collectors.toList());
}
private ExplodeFilter filter;

@Before
public void setUp() {
filter = new ExplodeFilter();
filter.configure(new HashMap<>());
}

@Test
public void should_explode_input_given_array_of_primitive() {
RecordsIterable<TypedStruct> result = filter.apply(null, create().put("message", PRIMITIVE_VALUES), false);
Assert.assertEquals(PRIMITIVE_VALUES.size(), result.size());
Assert.assertEquals(PRIMITIVE_VALUES, extractStringValues(result, it -> it.getString("message")));
}

@Test
public void should_explode_input_given_array_of_complex() {
RecordsIterable<TypedStruct> result = filter.apply(null, create().put("message", COMPLEXES_VALUES), false);
Assert.assertEquals(COMPLEXES_VALUES.size(), result.size());
Assert.assertEquals(COMPLEXES_VALUES, extractStringValues(result, it -> it.getStruct("message")));
}

@Test
public void should_explode_input_given_sub_field_source() {
filter.configure(new HashMap<String, String>() {{
put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "values");
}});

RecordsIterable<TypedStruct> result = filter.apply(null, create()
.put("message", "dummy")
.put("values", PRIMITIVE_VALUES), false);
Assert.assertEquals(PRIMITIVE_VALUES.size(), result.size());
Assert.assertEquals(PRIMITIVE_VALUES, extractStringValues(result, it -> it.getString("values")));
}

private <T> List<T> extractStringValues(final RecordsIterable<TypedStruct> result,
final Function<TypedStruct, T> extractor) {
return result.stream().map(extractor).collect(Collectors.toList());
}
}

0 comments on commit 9e90720

Please sign in to comment.