From abbdb2028817ef1ced4e16c62092ece7b92a7041 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 14 Mar 2024 11:42:39 -0700 Subject: [PATCH 1/5] Add key filter support --- .../OpenapiAsyncCompositeBindingAdapter.java | 25 ++++++-- .../config/HttpKafkaWithFetchConfig.java | 2 - .../HttpKafkaWithFetchConfigBuilder.java | 1 - .../HttpKafkaWithFetchFilterConfig.java | 14 ++++- ...HttpKafkaWithFetchFilterConfigBuilder.java | 62 +++++++++++++++++++ .../HttpKafkaWithFetchFilterHeaderConfig.java | 15 ++++- ...fkaWithFetchFilterHeaderConfigBuilder.java | 61 ++++++++++++++++++ .../HttpKafkaWithFetchConfigAdapter.java | 2 + .../config/HttpKafkaWithResolver.java | 2 + .../HttpKafkaWithConfigAdapterTest.java | 2 + 10 files changed, 177 insertions(+), 9 deletions(-) rename runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/{internal => }/config/HttpKafkaWithFetchFilterConfig.java (66%) create mode 100644 runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java rename runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/{internal => }/config/HttpKafkaWithFetchFilterHeaderConfig.java (60%) create mode 100644 runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfigBuilder.java diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index 81344c31f0..dcbbabac85 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import java.util.List; @@ -30,6 +31,7 @@ import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfigBuilder; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiOptionsConfig; @@ -49,8 +51,10 @@ public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi { private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); + private static final Pattern PATH_ID_PATTERN = Pattern.compile("\\{id\\}"); private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); + private final Matcher pathId = PATH_ID_PATTERN.matcher(""); @Override public String type() @@ -119,6 +123,8 @@ private BindingConfigBuilder injectHttpKafkaRoutes( final AsyncapiChannelView channel = AsyncapiChannelView .of(asyncapi.channels, asyncapiOperation.channel); + final boolean includeKey = pathId.reset(item).find(); + binding .route() .exit(qname) @@ -127,7 +133,7 @@ private BindingConfigBuilder injectHttpKafkaRoutes( .path(item) .build() .inject(r -> injectHttpKafkaRouteWith(r, openapi, openapiOperation, - asyncapiOperation.action, channel.address())) + asyncapiOperation.action, channel.address(), includeKey)) .build(); } } @@ -142,7 +148,8 @@ private RouteConfigBuilder injectHttpKafkaRouteWith( Openapi openapi, OpenapiOperation operation, String action, - String address) + String address, + boolean includeKey) { final HttpKafkaWithConfigBuilder newWith = HttpKafkaWithConfig.builder(); @@ -151,7 +158,7 @@ private RouteConfigBuilder injectHttpKafkaRouteWith( case "receive": newWith.fetch(HttpKafkaWithFetchConfig.builder() .topic(address) - .inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation)) + .inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation, includeKey)) .build()); break; case "send": @@ -170,7 +177,8 @@ private RouteConfigBuilder injectHttpKafkaRouteWith( private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( HttpKafkaWithFetchConfigBuilder fetch, Openapi openapi, - OpenapiOperation operation) + OpenapiOperation operation, + boolean includeKey) { merge: for (Map.Entry response : operation.responses.entrySet()) @@ -181,10 +189,19 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( { fetch.merged(HttpKafkaWithFetchMergeConfig.builder() .contentType("application/json") + .initial("[]") + .path("/-") .build()); break merge; } + if (includeKey) + { + fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder() + .key("${params.id}") + .headers(emptyList()) + .build())); + } } return fetch; } diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfig.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfig.java index 7c201c8c0a..c90f6cf805 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfig.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfig.java @@ -18,8 +18,6 @@ import java.util.Optional; import java.util.function.Function; -import io.aklivity.zilla.runtime.binding.http.kafka.internal.config.HttpKafkaWithFetchFilterConfig; - public final class HttpKafkaWithFetchConfig { public final String topic; diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfigBuilder.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfigBuilder.java index 7edaec2ba0..21a86f6007 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfigBuilder.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchConfigBuilder.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.function.Function; -import io.aklivity.zilla.runtime.binding.http.kafka.internal.config.HttpKafkaWithFetchFilterConfig; import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; public final class HttpKafkaWithFetchConfigBuilder extends ConfigBuilder> diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchFilterConfig.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfig.java similarity index 66% rename from runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchFilterConfig.java rename to runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfig.java index f394382a41..1d250860af 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchFilterConfig.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfig.java @@ -12,10 +12,11 @@ * WARRANTIES OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package io.aklivity.zilla.runtime.binding.http.kafka.internal.config; +package io.aklivity.zilla.runtime.binding.http.kafka.config; import java.util.List; import java.util.Optional; +import java.util.function.Function; public final class HttpKafkaWithFetchFilterConfig { @@ -29,4 +30,15 @@ public HttpKafkaWithFetchFilterConfig( this.key = Optional.ofNullable(key); this.headers = Optional.ofNullable(headers); } + + public static HttpKafkaWithFetchFilterConfigBuilder builder() + { + return new HttpKafkaWithFetchFilterConfigBuilder<>(HttpKafkaWithFetchFilterConfig.class::cast); + } + + public static HttpKafkaWithFetchFilterConfigBuilder builder( + Function mapper) + { + return new HttpKafkaWithFetchFilterConfigBuilder<>(mapper); + } } diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java new file mode 100644 index 0000000000..de99fb29c3 --- /dev/null +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.http.kafka.config; + +import java.util.List; +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; + +public final class HttpKafkaWithFetchFilterConfigBuilder extends + ConfigBuilder> +{ + private final Function mapper; + private String key; + private List headers; + + + HttpKafkaWithFetchFilterConfigBuilder( + Function mapper) + { + this.mapper = mapper; + } + + public HttpKafkaWithFetchFilterConfigBuilder key( + String key) + { + this.key = key; + return this; + } + + public HttpKafkaWithFetchFilterConfigBuilder headers( + List headers) + { + this.headers = headers; + return this; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> thisType() + { + return (Class>) getClass(); + } + + @Override + public T build() + { + return mapper.apply(new HttpKafkaWithFetchFilterConfig(key, headers)); + } +} diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchFilterHeaderConfig.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfig.java similarity index 60% rename from runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchFilterHeaderConfig.java rename to runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfig.java index 285ee6cfd4..8a5239bdfd 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchFilterHeaderConfig.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfig.java @@ -12,7 +12,9 @@ * WARRANTIES OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package io.aklivity.zilla.runtime.binding.http.kafka.internal.config; +package io.aklivity.zilla.runtime.binding.http.kafka.config; + +import java.util.function.Function; public final class HttpKafkaWithFetchFilterHeaderConfig { @@ -26,4 +28,15 @@ public HttpKafkaWithFetchFilterHeaderConfig( this.name = name; this.value = value; } + + public static HttpKafkaWithFetchFilterHeaderConfigBuilder builder() + { + return new HttpKafkaWithFetchFilterHeaderConfigBuilder<>(HttpKafkaWithFetchFilterHeaderConfig.class::cast); + } + + public static HttpKafkaWithFetchFilterHeaderConfigBuilder builder( + Function mapper) + { + return new HttpKafkaWithFetchFilterHeaderConfigBuilder<>(mapper); + } } diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfigBuilder.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfigBuilder.java new file mode 100644 index 0000000000..030b358117 --- /dev/null +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterHeaderConfigBuilder.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.http.kafka.config; + +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; + +public final class HttpKafkaWithFetchFilterHeaderConfigBuilder extends + ConfigBuilder> +{ + private final Function mapper; + private String name; + private String value; + + + HttpKafkaWithFetchFilterHeaderConfigBuilder( + Function mapper) + { + this.mapper = mapper; + } + + public HttpKafkaWithFetchFilterHeaderConfigBuilder name( + String name) + { + this.name = name; + return this; + } + + public HttpKafkaWithFetchFilterHeaderConfigBuilder value( + String value) + { + this.value = value; + return this; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> thisType() + { + return (Class>) getClass(); + } + + @Override + public T build() + { + return mapper.apply(new HttpKafkaWithFetchFilterHeaderConfig(name, value)); + } +} diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java index e8edee9089..14fcbf9b70 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java @@ -26,6 +26,8 @@ import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; public final class HttpKafkaWithFetchConfigAdapter implements JsonbAdapter diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithResolver.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithResolver.java index 498abb03d4..7872ccef48 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithResolver.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithResolver.java @@ -31,6 +31,8 @@ import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaOptionsConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig; diff --git a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java index 4ec1da57ac..67db84b92b 100644 --- a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java +++ b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java @@ -37,6 +37,8 @@ import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig; From f2c46d8db9f7d45011dc16ad748a3db584f0298b Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 14 Mar 2024 11:57:06 -0700 Subject: [PATCH 2/5] Fix remaining issues --- .../OpenapiAsyncCompositeBindingAdapter.java | 15 +++++++-------- .../config/HttpKafkaWithFetchConfigAdapter.java | 10 ++++++++-- .../config/HttpKafkaWithConfigAdapterTest.java | 12 +++++++----- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index dcbbabac85..283ccca251 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -15,7 +15,6 @@ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; -import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import java.util.List; @@ -194,15 +193,15 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( .build()); break merge; } + } - if (includeKey) - { - fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder() - .key("${params.id}") - .headers(emptyList()) - .build())); - } + if (includeKey) + { + fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder() + .key("${params.id}") + .build())); } + return fetch; } diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java index 14fcbf9b70..f48d6031a9 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithFetchConfigAdapter.java @@ -144,11 +144,17 @@ public HttpKafkaWithConfig adaptFromJson( for (String newHeaderName : headers.keySet()) { String newHeaderValue = headers.getString(newHeaderName); - newHeaders.add(new HttpKafkaWithFetchFilterHeaderConfig(newHeaderName, newHeaderValue)); + newHeaders.add(HttpKafkaWithFetchFilterHeaderConfig.builder() + .name(newHeaderName) + .value(newHeaderValue) + .build()); } } - newFilters.add(new HttpKafkaWithFetchFilterConfig(newKey, newHeaders)); + newFilters.add(HttpKafkaWithFetchFilterConfig.builder() + .key(newKey) + .headers(newHeaders) + .build()); } } diff --git a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java index 67db84b92b..ce90a60f6a 100644 --- a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java +++ b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java @@ -131,11 +131,13 @@ public void shouldWriteWithFetchTopicAndFilters() HttpKafkaWithConfig with = HttpKafkaWithConfig.builder() .fetch(HttpKafkaWithFetchConfig.builder() .topic("test") - .filters(singletonList(new HttpKafkaWithFetchFilterConfig( - "fixed-key", - singletonList(new HttpKafkaWithFetchFilterHeaderConfig( - "tag", - "fixed-tag"))))) + .filters(singletonList(HttpKafkaWithFetchFilterConfig.builder() + .key("fixed-key") + .headers(singletonList(HttpKafkaWithFetchFilterHeaderConfig.builder() + .name("tag") + .value("fixed-tag") + .build())) + .build())) .merged(null) .build()) .build(); From b0aab68344f1e5283f91463b2cf62ab7cdbf7951 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 14 Mar 2024 13:25:46 -0700 Subject: [PATCH 3/5] Apply feedback from PR --- .../OpenapiAsyncCompositeBindingAdapter.java | 24 ++++++++++++------- ...HttpKafkaWithFetchFilterConfigBuilder.java | 18 ++++++++++++++ .../HttpKafkaWithConfigAdapterTest.java | 7 +----- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index 283ccca251..c11a989192 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -17,6 +17,7 @@ import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; import static java.util.stream.Collectors.toList; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.regex.Matcher; @@ -50,10 +51,10 @@ public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi { private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - private static final Pattern PATH_ID_PATTERN = Pattern.compile("\\{id\\}"); + private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]*)\\}"); private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); - private final Matcher pathId = PATH_ID_PATTERN.matcher(""); + private final Matcher parameters = PARAMETER_PATTERN.matcher(""); @Override public String type() @@ -122,7 +123,12 @@ private BindingConfigBuilder injectHttpKafkaRoutes( final AsyncapiChannelView channel = AsyncapiChannelView .of(asyncapi.channels, asyncapiOperation.channel); - final boolean includeKey = pathId.reset(item).find(); + List paramNames = new ArrayList<>(); + Matcher matcher = parameters.reset(item); + while (matcher.find()) + { + paramNames.add(parameters.group(1)); + } binding .route() @@ -132,7 +138,7 @@ private BindingConfigBuilder injectHttpKafkaRoutes( .path(item) .build() .inject(r -> injectHttpKafkaRouteWith(r, openapi, openapiOperation, - asyncapiOperation.action, channel.address(), includeKey)) + asyncapiOperation.action, channel.address(), paramNames)) .build(); } } @@ -148,7 +154,7 @@ private RouteConfigBuilder injectHttpKafkaRouteWith( OpenapiOperation operation, String action, String address, - boolean includeKey) + List paramNames) { final HttpKafkaWithConfigBuilder newWith = HttpKafkaWithConfig.builder(); @@ -157,7 +163,7 @@ private RouteConfigBuilder injectHttpKafkaRouteWith( case "receive": newWith.fetch(HttpKafkaWithFetchConfig.builder() .topic(address) - .inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation, includeKey)) + .inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation, paramNames)) .build()); break; case "send": @@ -177,7 +183,7 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( HttpKafkaWithFetchConfigBuilder fetch, Openapi openapi, OpenapiOperation operation, - boolean includeKey) + List paramNames) { merge: for (Map.Entry response : operation.responses.entrySet()) @@ -195,10 +201,10 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( } } - if (includeKey) + if (!paramNames.isEmpty()) { fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder() - .key("${params.id}") + .key(String.format("${params.%s}", paramNames.get(0))) .build())); } diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java index de99fb29c3..6f6807f153 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.http.kafka.config; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -47,6 +48,23 @@ public HttpKafkaWithFetchFilterConfigBuilder headers( return this; } + public HttpKafkaWithFetchFilterConfigBuilder header( + String name, + String value) + { + if (headers != null) + { + headers = new ArrayList<>(); + } + + headers.add(HttpKafkaWithFetchFilterHeaderConfig.builder() + .name(name) + .value(value) + .build()); + + return this; + } + @Override @SuppressWarnings("unchecked") protected Class> thisType() diff --git a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java index ce90a60f6a..322b3825c9 100644 --- a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java +++ b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java @@ -18,7 +18,6 @@ import static com.github.npathai.hamcrestopt.OptionalMatchers.isPresent; import static com.github.npathai.hamcrestopt.OptionalMatchers.isPresentAnd; import static com.vtence.hamcrest.jpa.HasFieldWithValue.hasField; -import static io.aklivity.zilla.runtime.binding.http.kafka.internal.types.KafkaAckMode.LEADER_ONLY; import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -38,7 +37,6 @@ import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig; import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig; @@ -133,10 +131,7 @@ public void shouldWriteWithFetchTopicAndFilters() .topic("test") .filters(singletonList(HttpKafkaWithFetchFilterConfig.builder() .key("fixed-key") - .headers(singletonList(HttpKafkaWithFetchFilterHeaderConfig.builder() - .name("tag") - .value("fixed-tag") - .build())) + .header("tag", "fixed-tag") .build())) .merged(null) .build()) From b01a150f8b5a42b7e1616cc39ae5311d996556fa Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 14 Mar 2024 13:35:55 -0700 Subject: [PATCH 4/5] require at least one character --- .../internal/config/OpenapiAsyncCompositeBindingAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index c11a989192..cbf80cc559 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -51,7 +51,7 @@ public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi { private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]*)\\}"); + private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}"); private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); private final Matcher parameters = PARAMETER_PATTERN.matcher(""); From 4c0901de6f2515ab2730f1dee180cfaa21bd239d Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 14 Mar 2024 14:03:20 -0700 Subject: [PATCH 5/5] Fix typo --- .../kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java | 2 +- .../kafka/internal/config/HttpKafkaWithConfigAdapterTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java index 6f6807f153..248672e0fd 100644 --- a/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java +++ b/runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithFetchFilterConfigBuilder.java @@ -52,7 +52,7 @@ public HttpKafkaWithFetchFilterConfigBuilder header( String name, String value) { - if (headers != null) + if (headers == null) { headers = new ArrayList<>(); } diff --git a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java index 322b3825c9..8dac67f72c 100644 --- a/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java +++ b/runtime/binding-http-kafka/src/test/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithConfigAdapterTest.java @@ -18,6 +18,7 @@ import static com.github.npathai.hamcrestopt.OptionalMatchers.isPresent; import static com.github.npathai.hamcrestopt.OptionalMatchers.isPresentAnd; import static com.vtence.hamcrest.jpa.HasFieldWithValue.hasField; +import static io.aklivity.zilla.runtime.binding.http.kafka.internal.types.KafkaAckMode.LEADER_ONLY; import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf;