Skip to content

Commit

Permalink
feat(filters): add built-in ExcludeFieldsMatchingPatternFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
TheWorkshopCom authored and fhussonnois committed Sep 12, 2023
1 parent 6752c23 commit e854995
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef;

public class ExcludeFieldsMatchingPatternsConfig extends CommonFilterConfig {
public class ExcludeFieldsMatchingPatternConfig extends CommonFilterConfig {
public static final String EXCLUDE_FIELDS_REGEX_CONFIG = "regex";

private static final String EXCLUDE_FIELDS_REGEX_CONFIG_DOC = "Regexp pattern applied to a field value to determine if the fields should be propagated or not.";
Expand All @@ -31,7 +31,7 @@ public class ExcludeFieldsMatchingPatternsConfig extends CommonFilterConfig {

private static final String EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG_DOC = "If true, omits propagating the field downstream. Otherwise, propagates the field with a null value";

public ExcludeFieldsMatchingPatternsConfig(Map<?, ?> originals) {
public ExcludeFieldsMatchingPatternConfig(Map<?, ?> originals) {
super(configDef(), originals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternsConfig;
import io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
Expand All @@ -27,19 +27,19 @@
import java.util.regex.Matcher;
import org.apache.kafka.common.config.ConfigDef;

public class ExcludeFieldsMatchingPatternsFilter extends AbstractRecordFilter<ExcludeFieldsMatchingPatternsFilter> {
public class ExcludeFieldsMatchingPatternFilter extends AbstractRecordFilter<ExcludeFieldsMatchingPatternFilter> {

private ExcludeFieldsMatchingPatternsConfig config;
private ExcludeFieldsMatchingPatternConfig config;

@Override
public ConfigDef configDef() {
return ExcludeFieldsMatchingPatternsConfig.configDef();
return ExcludeFieldsMatchingPatternConfig.configDef();
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
this.config = new ExcludeFieldsMatchingPatternsConfig(configs);
this.config = new ExcludeFieldsMatchingPatternConfig(configs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package io.streamthoughts.kafka.connect.filepulse.filter;

import static io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternsConfig.EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG;
import static io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternsConfig.EXCLUDE_FIELDS_REGEX_CONFIG;
import static io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternConfig.EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG;
import static io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternConfig.EXCLUDE_FIELDS_REGEX_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -32,12 +32,12 @@
import java.util.Map;
import org.junit.jupiter.api.Test;

class ExcludeFieldsMatchingPatternsFilterTest {
class ExcludeFieldsMatchingPatternFilterTest {

@Test
void when_record_null_apply_should_return_empty_iterable() {
FilterContext context = mock(FilterContext.class);
ExcludeFieldsMatchingPatternsFilter filter = new ExcludeFieldsMatchingPatternsFilter();
ExcludeFieldsMatchingPatternFilter filter = new ExcludeFieldsMatchingPatternFilter();
filter.configure(Map.of(EXCLUDE_FIELDS_REGEX_CONFIG, "[a-z]"));

RecordsIterable<TypedStruct> iterable = filter.apply(context, null, false);
Expand All @@ -48,7 +48,7 @@ void when_record_null_apply_should_return_empty_iterable() {
@Test
void when_record_apply_should_propagate_null_for_fields_matching_regex_and_propagate_all_other_fields() {
FilterContext context = mock(FilterContext.class);
ExcludeFieldsMatchingPatternsFilter filter = new ExcludeFieldsMatchingPatternsFilter();
ExcludeFieldsMatchingPatternFilter filter = new ExcludeFieldsMatchingPatternFilter();
filter.configure(Map.of(EXCLUDE_FIELDS_REGEX_CONFIG, Fixture.nullFieldRegex));

TypedStruct typedStruct = TypedStruct.create()
Expand Down Expand Up @@ -77,7 +77,7 @@ void when_record_apply_should_propagate_null_for_fields_matching_regex_and_propa
@Test
void when_record_apply_should_block_fields_matching_regex_and_propagate_all_other_fields() {
FilterContext context = mock(FilterContext.class);
ExcludeFieldsMatchingPatternsFilter filter = new ExcludeFieldsMatchingPatternsFilter();
ExcludeFieldsMatchingPatternFilter filter = new ExcludeFieldsMatchingPatternFilter();
filter.configure(Map.of(EXCLUDE_FIELDS_REGEX_CONFIG, Fixture.nullFieldRegex,
EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG, "true"));

Expand Down

0 comments on commit e854995

Please sign in to comment.