From 0e3367a5839dd0a0759dbfed8a88c6b4824e384a Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 20 Jun 2023 21:45:02 +0500 Subject: [PATCH 1/9] Checkpoint --- .../internal/stream/SseKafkaProxyFactory.java | 41 +++++++++-------- .../internal/stream/SseKafkaProxyIT.java | 10 ++++ .../kafka/internal/KafkaFunctions.java | 6 +++ .../client.rpt | 40 ++++++++++++++++ .../server.rpt | 46 +++++++++++++++++++ .../client.rpt | 35 ++++++++++++++ .../server.rpt | 38 +++++++++++++++ 7 files changed, 198 insertions(+), 18 deletions(-) create mode 100644 specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/client.rpt create mode 100644 specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt create mode 100644 specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt create mode 100644 specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/server.rpt diff --git a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java index a8e84edac7..8234718f51 100644 --- a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java +++ b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java @@ -643,28 +643,33 @@ private void onKafkaData( } else { + String8FW encodedId = null; + OctetsFW key = null; final int flags = data.flags(); final OctetsFW payload = data.payload(); - final OctetsFW extension = data.extension(); - final ExtensionFW dataEx = extension.get(extensionRO::tryWrap); - final KafkaDataExFW kafkaDataEx = - dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null; - final KafkaMergedDataExFW kafkaMergedDataEx = - kafkaDataEx != null && kafkaDataEx.kind() == KafkaDataExFW.KIND_MERGED ? kafkaDataEx.merged() : null; - final Array32FW progress = kafkaMergedDataEx != null ? kafkaMergedDataEx.progress() : null; - final OctetsFW key = kafkaMergedDataEx != null ? kafkaMergedDataEx.key().value() : null; - final Array32FW headers = kafkaMergedDataEx != null ? kafkaMergedDataEx.headers() : null; - final KafkaHeaderFW etag = headers.matchFirst(h -> HEADER_NAME_ETAG.value().equals(h.name().value())); - String8FW encodedId = null; - switch (delegate.resolved.eventId()) + if ((flags & 0x02) != 0x00) { - case EVENT_ID_KEY64_AND_ETAG: - encodedId = sseEventId.encodeKeyAndProgress(key, progress, etag); - break; - case EVENT_ID_ETAG_ONLY: - encodedId = sseEventId.encodeProgressOnly(progress, etag); - break; + final OctetsFW extension = data.extension(); + final ExtensionFW dataEx = extension.get(extensionRO::tryWrap); + final KafkaDataExFW kafkaDataEx = + dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null; + final KafkaMergedDataExFW kafkaMergedDataEx = + kafkaDataEx != null && kafkaDataEx.kind() == KafkaDataExFW.KIND_MERGED ? kafkaDataEx.merged() : null; + final Array32FW progress = kafkaMergedDataEx != null ? kafkaMergedDataEx.progress() : null; + key = kafkaMergedDataEx != null ? kafkaMergedDataEx.key().value() : null; + final Array32FW headers = kafkaMergedDataEx != null ? kafkaMergedDataEx.headers() : null; + final KafkaHeaderFW etag = headers.matchFirst(h -> HEADER_NAME_ETAG.value().equals(h.name().value())); + + switch (delegate.resolved.eventId()) + { + case EVENT_ID_KEY64_AND_ETAG: + encodedId = sseEventId.encodeKeyAndProgress(key, progress, etag); + break; + case EVENT_ID_ETAG_ONLY: + encodedId = sseEventId.encodeProgressOnly(progress, etag); + break; + } } final String8FW eventType = payload == null ? EVENT_TYPE_DELETE : EVENT_TYPE_MESSAGE; diff --git a/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java b/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java index 7d858b540b..d8b363ddb5 100644 --- a/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java +++ b/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java @@ -157,6 +157,16 @@ public void shouldReceiveServerSentMessagesWithEtag() throws Exception k3po.finish(); } + @Test + @Configuration("proxy.with.topic.yaml") + @Specification({ + "${sse}/server.sent.messages.with.null.etag/client", + "${kafka}/server.sent.messages.with.null.etag/server"}) + public void shouldReceiveServerSentMessagesWithNullEtag() throws Exception + { + k3po.finish(); + } + @Test @Configuration("proxy.with.topic.and.event.id.yaml") @Specification({ diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index a38ebaa231..ae3dbf668e 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -397,6 +397,7 @@ public KafkaFilterBuilder header( String name, String value) { + if (value == null) { nameRO.wrap(name.getBytes(UTF_8)); @@ -1338,6 +1339,11 @@ public KafkaMergedDataExBuilder header( return this; } + public KafkaMergedDataExBuilder headersNull() + { + return this; + } + public KafkaMergedDataExBuilder headerNull( String name) { diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/client.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/client.rpt new file mode 100644 index 0000000000..bf77e391ff --- /dev/null +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/client.rpt @@ -0,0 +1,40 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("test") + .partition(-1, -2) + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("key") + .build() + .build()} +read "Hello, world" diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt new file mode 100644 index 0000000000..5edc852663 --- /dev/null +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt @@ -0,0 +1,46 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("test") + .partition(-1, -2) + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .timestamp(kafka:timestamp()) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("key") + .headersNull() + .build() + .build()} +write "Hello, world" +write flush + diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt new file mode 100644 index 0000000000..f9e0a9f1cb --- /dev/null +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt @@ -0,0 +1,35 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/sse0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${sse:beginEx() + .typeId(zilla:id("sse")) + .scheme("http") + .authority("localhost:8080") + .path("/test") + .lastId(null) + .build()} + +connected + +read zilla:data.ext ${sse:matchDataEx() + .typeId(zilla:id("sse")) + .id(null) + .type(null) + .build()} +read "Hello, world" diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/server.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/server.rpt new file mode 100644 index 0000000000..aa72ed4181 --- /dev/null +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/server.rpt @@ -0,0 +1,38 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/sse0" + option zilla:window 8192 + option zilla:transmission "duplex" +accepted + +read zilla:begin.ext ${sse:beginEx() + .typeId(zilla:id("sse")) + .scheme("http") + .authority("localhost:8080") + .path("/test") + .lastId(null) + .build()} + +connected + +write flush + +write zilla:data.ext ${sse:dataEx() + .typeId(zilla:id("sse")) + .id("AQQABAIC/revision=42") + .build()} +write "Hello, world" + From 3d57aa35076cc4e14d47141f077c9a06768811f2 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 22 Jun 2023 19:43:37 +0500 Subject: [PATCH 2/9] Checkpoint --- .../internal/budget/KafkaMergedBudget.java | 2 ++ .../internal/stream/KafkaMergedFactory.java | 36 ++++++++++++------- .../internal/stream/SseKafkaProxyFactory.java | 7 +++- .../internal/stream/SseKafkaProxyIT.java | 6 ++-- .../runtime/engine/EngineConfiguration.java | 2 +- .../client.rpt | 0 .../server.rpt | 6 ++-- .../client.rpt | 4 +-- .../server.rpt | 0 9 files changed, 41 insertions(+), 22 deletions(-) rename specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/{server.sent.messages.with.null.etag => server.sent.100k.message}/client.rpt (100%) rename specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/{server.sent.messages.with.null.etag => server.sent.100k.message}/server.rpt (93%) rename specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/{server.sent.messages.with.null.etag => server.sent.100k.message}/client.rpt (95%) rename specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/{server.sent.messages.with.null.etag => server.sent.100k.message}/server.rpt (100%) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java index 1fc131c67b..b2c923b26c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java @@ -62,6 +62,7 @@ long credit( final long budgetSnapshot = budget; budget += credit; + System.out.println("Budget Credit " + budget); flush(traceId); @@ -100,6 +101,7 @@ else if (claimed < minimum) if (claimed >= minimum) { budget -= claimed; + System.out.println("Budget Claim " + budget); } final int watcherAt = watchers.indexOf(watcherId); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index 96ca2ab9ac..c3ccbd0002 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -1023,6 +1023,7 @@ private final class KafkaMergedStream private long initialAck; private int initialMax; + private long replyReplyBudget; private long replySeq; private long replyAck; private int replyMax; @@ -1339,26 +1340,31 @@ private void onMergedReplyWindow( final int credit = (replyNoAck - newReplyNoAck) + (maximum - replyMax); assert credit >= 0; + System.out.println(String.format("onMergedReplyWindow %d+%d=%d", replyReplyBudget, credit, + replyReplyBudget + credit)); + this.replyAck = acknowledge; this.replyMax = maximum; this.replyMin = minimum; this.replyPad = padding; this.replyBudgetId = budgetId; - assert replyAck <= replySeq; - - if (KafkaState.replyOpening(state)) + if (replyReplyBudget + credit <= replyMax) { - state = KafkaState.openedReply(state); - if (mergedReplyBudgetId == NO_CREDITOR_INDEX) + if (KafkaState.replyOpening(state)) { - mergedReplyBudgetId = creditor.acquire(replyId, budgetId); + state = KafkaState.openedReply(state); + if (mergedReplyBudgetId == NO_CREDITOR_INDEX) + { + mergedReplyBudgetId = creditor.acquire(replyId, budgetId); + } } - } - if (mergedReplyBudgetId != NO_CREDITOR_INDEX) - { - creditor.credit(traceId, mergedReplyBudgetId, credit); + if (mergedReplyBudgetId != NO_CREDITOR_INDEX) + { + creditor.credit(traceId, mergedReplyBudgetId, credit); + replyReplyBudget += credit; + } } doUnmergedFetchReplyWindowsIfNecessary(traceId); @@ -1433,6 +1439,7 @@ private void doMergedReplyBegin( { mergedReplyBudgetId = creditor.acquire(replyId, replyBudgetId); creditor.credit(traceId, mergedReplyBudgetId, replyBudget); + replyReplyBudget += replyBudget; } } @@ -1525,6 +1532,8 @@ private void doMergedReplyData( traceId, authorization, replyBudgetId, reserved, flags, payload, newKafkaDataEx); replySeq += reserved; + replyReplyBudget -= reserved; + System.out.println(String.format("doMergedReplyData %d", replyReplyBudget)); assert replyAck <= replySeq; } @@ -1619,9 +1628,9 @@ private void doMergedReplyEndIfNecessary( } private void doMergedReplyFlush( - long traceId, - int reserved, - KafkaFlushExFW kafkaFlushEx) + long traceId, + int reserved, + KafkaFlushExFW kafkaFlushEx) { final KafkaFetchFlushExFW kafkaFetchFlushEx = kafkaFlushEx.fetch(); kafkaFetchFlushEx.partition().partitionId(); @@ -1652,6 +1661,7 @@ private void doMergedReplyFlush( traceId, authorization, reserved, kafkaFlushExFW); replySeq += reserved; + replyReplyBudget -= reserved; assert replyAck <= replySeq; } diff --git a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java index 8234718f51..8c1fd03597 100644 --- a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java +++ b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java @@ -55,6 +55,7 @@ public final class SseKafkaProxyFactory implements SseKafkaStreamFactory { + private static final int INIT_FLAG = 0x02; private static final String SSE_TYPE_NAME = "sse"; private static final String KAFKA_TYPE_NAME = "kafka"; @@ -634,6 +635,8 @@ private void onKafkaData( replySeq = sequence + reserved; + System.out.println(String.format("Sse budget ondata %d", replyMax - (int)(replySeq - replyAck))); + assert replyAck <= replySeq; if (replySeq > replyAck + replyMax) @@ -648,7 +651,7 @@ private void onKafkaData( final int flags = data.flags(); final OctetsFW payload = data.payload(); - if ((flags & 0x02) != 0x00) + if ((flags & INIT_FLAG) != 0x00) { final OctetsFW extension = data.extension(); final ExtensionFW dataEx = extension.get(extensionRO::tryWrap); @@ -811,6 +814,8 @@ private void doKafkaWindow( doWindow(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization, budgetId, padding, minimum, capabilities); + + System.out.println(String.format("Sse budget doKafkaWindow %d", replyMax - (int)(replySeq - replyAck))); } } diff --git a/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java b/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java index d8b363ddb5..69b6b85b1b 100644 --- a/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java +++ b/runtime/binding-sse-kafka/src/test/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyIT.java @@ -160,9 +160,9 @@ public void shouldReceiveServerSentMessagesWithEtag() throws Exception @Test @Configuration("proxy.with.topic.yaml") @Specification({ - "${sse}/server.sent.messages.with.null.etag/client", - "${kafka}/server.sent.messages.with.null.etag/server"}) - public void shouldReceiveServerSentMessagesWithNullEtag() throws Exception + "${sse}/server.sent.100k.message/client", + "${kafka}/server.sent.100k.message/server"}) + public void shouldReceiveServerSent100kMessage() throws Exception { k3po.finish(); } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java index 6bdc7840ec..4353f19dbe 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java @@ -85,7 +85,7 @@ public class EngineConfiguration extends Configuration EngineConfiguration::decodeHostResolver, EngineConfiguration::defaultHostResolver); ENGINE_BUDGETS_BUFFER_CAPACITY = config.property("budgets.buffer.capacity", 1024 * 1024); ENGINE_LOAD_BUFFER_CAPACITY = config.property("load.buffer.capacity", 1024 * 8); - ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 1024 * 1024); + ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 2097152); ENGINE_COMMAND_BUFFER_CAPACITY = config.property("command.buffer.capacity", 1024 * 1024); ENGINE_RESPONSE_BUFFER_CAPACITY = config.property("response.buffer.capacity", 1024 * 1024); ENGINE_COUNTERS_BUFFER_CAPACITY = config.property("counters.buffer.capacity", 1024 * 1024); diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/client.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/client.rpt similarity index 100% rename from specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/client.rpt rename to specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/client.rpt diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt similarity index 93% rename from specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt rename to specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt index 5edc852663..5fa84dbb50 100644 --- a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.messages.with.null.etag/server.rpt +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt @@ -13,6 +13,8 @@ # specific language governing permissions and limitations under the License. # +property data ${http:randomBytes(100000)} + accept "zilla://streams/kafka0" option zilla:window 8192 option zilla:transmission "duplex" @@ -38,9 +40,9 @@ write zilla:data.ext ${kafka:dataEx() .progress(0, 2) .progress(1, 1) .key("key") - .headersNull() + .header("header", "value") .build() .build()} -write "Hello, world" +write ${data} write flush diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/client.rpt similarity index 95% rename from specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt rename to specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/client.rpt index f9e0a9f1cb..e81c2a24b9 100644 --- a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/client.rpt +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/client.rpt @@ -29,7 +29,7 @@ connected read zilla:data.ext ${sse:matchDataEx() .typeId(zilla:id("sse")) - .id(null) + .id("AQQABAIC") .type(null) .build()} -read "Hello, world" +read [0..100000] diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/server.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/server.rpt similarity index 100% rename from specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.messages.with.null.etag/server.rpt rename to specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/server.rpt From ff5fe69c59fa0ef82418176448c2aa6e1ea304ff Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 17:35:37 +0500 Subject: [PATCH 3/9] Add spec test --- .../internal/budget/KafkaMergedBudget.java | 2 -- .../internal/stream/KafkaMergedFactory.java | 36 +++++++------------ .../kafka/server.sent.100k.message/client.rpt | 33 ++++++++--------- .../kafka/server.sent.100k.message/server.rpt | 2 +- .../sse/server.sent.100k.message/server.rpt | 26 +++++++------- .../binding/sse/kafka/streams/KafkaIT.java | 9 +++++ .../binding/sse/kafka/streams/SseIT.java | 9 +++++ 7 files changed, 62 insertions(+), 55 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java index b2c923b26c..1fc131c67b 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/budget/KafkaMergedBudget.java @@ -62,7 +62,6 @@ long credit( final long budgetSnapshot = budget; budget += credit; - System.out.println("Budget Credit " + budget); flush(traceId); @@ -101,7 +100,6 @@ else if (claimed < minimum) if (claimed >= minimum) { budget -= claimed; - System.out.println("Budget Claim " + budget); } final int watcherAt = watchers.indexOf(watcherId); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index c3ccbd0002..1f15213e5c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -1023,7 +1023,6 @@ private final class KafkaMergedStream private long initialAck; private int initialMax; - private long replyReplyBudget; private long replySeq; private long replyAck; private int replyMax; @@ -1335,13 +1334,8 @@ private void onMergedReplyWindow( assert acknowledge >= replyAck; assert maximum >= replyMax; - final int replyNoAck = (int)(replySeq - replyAck); - final int newReplyNoAck = (int)(sequence - acknowledge); - final int credit = (replyNoAck - newReplyNoAck) + (maximum - replyMax); - assert credit >= 0; - - System.out.println(String.format("onMergedReplyWindow %d+%d=%d", replyReplyBudget, credit, - replyReplyBudget + credit)); + final int credit = (int)(acknowledge - replyAck) + (maximum - replyMax); + System.out.println(credit); this.replyAck = acknowledge; this.replyMax = maximum; @@ -1349,22 +1343,20 @@ private void onMergedReplyWindow( this.replyPad = padding; this.replyBudgetId = budgetId; - if (replyReplyBudget + credit <= replyMax) + assert replyAck <= replySeq; + + if (KafkaState.replyOpening(state)) { - if (KafkaState.replyOpening(state)) + state = KafkaState.openedReply(state); + if (mergedReplyBudgetId == NO_CREDITOR_INDEX) { - state = KafkaState.openedReply(state); - if (mergedReplyBudgetId == NO_CREDITOR_INDEX) - { - mergedReplyBudgetId = creditor.acquire(replyId, budgetId); - } + mergedReplyBudgetId = creditor.acquire(replyId, budgetId); } + } - if (mergedReplyBudgetId != NO_CREDITOR_INDEX) - { - creditor.credit(traceId, mergedReplyBudgetId, credit); - replyReplyBudget += credit; - } + if (mergedReplyBudgetId != NO_CREDITOR_INDEX) + { + creditor.credit(traceId, mergedReplyBudgetId, credit); } doUnmergedFetchReplyWindowsIfNecessary(traceId); @@ -1439,7 +1431,6 @@ private void doMergedReplyBegin( { mergedReplyBudgetId = creditor.acquire(replyId, replyBudgetId); creditor.credit(traceId, mergedReplyBudgetId, replyBudget); - replyReplyBudget += replyBudget; } } @@ -1532,8 +1523,6 @@ private void doMergedReplyData( traceId, authorization, replyBudgetId, reserved, flags, payload, newKafkaDataEx); replySeq += reserved; - replyReplyBudget -= reserved; - System.out.println(String.format("doMergedReplyData %d", replyReplyBudget)); assert replyAck <= replySeq; } @@ -1661,7 +1650,6 @@ private void doMergedReplyFlush( traceId, authorization, reserved, kafkaFlushExFW); replySeq += reserved; - replyReplyBudget -= reserved; assert replyAck <= replySeq; } diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/client.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/client.rpt index bf77e391ff..007d75d046 100644 --- a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/client.rpt +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/client.rpt @@ -18,23 +18,24 @@ connect "zilla://streams/kafka0" option zilla:transmission "duplex" write zilla:begin.ext ${kafka:beginEx() - .typeId(zilla:id("kafka")) - .merged() - .capabilities("FETCH_ONLY") - .topic("test") - .partition(-1, -2) - .build() - .build()} + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("test") + .partition(-1, -2) + .build() + .build()} connected read zilla:data.ext ${kafka:matchDataEx() - .typeId(zilla:id("kafka")) - .merged() - .partition(0, 1, 2) - .progress(0, 2) - .progress(1, 1) - .key("key") - .build() - .build()} -read "Hello, world" + .typeId(zilla:id("kafka")) + .merged() + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("key") + .header("header", "value") + .build() + .build()} +read [0...100000] diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt index 5fa84dbb50..9eb4f91d0f 100644 --- a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/kafka/server.sent.100k.message/server.rpt @@ -21,7 +21,7 @@ accept "zilla://streams/kafka0" accepted -read zilla:begin.ext ${kafka:beginEx() +read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .merged() .capabilities("FETCH_ONLY") diff --git a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/server.rpt b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/server.rpt index aa72ed4181..da7b14f93c 100644 --- a/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/server.rpt +++ b/specs/binding-sse-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/sse/kafka/streams/sse/server.sent.100k.message/server.rpt @@ -13,26 +13,28 @@ # specific language governing permissions and limitations under the License. # +property data ${http:randomBytes(100000)} + accept "zilla://streams/sse0" option zilla:window 8192 option zilla:transmission "duplex" accepted read zilla:begin.ext ${sse:beginEx() - .typeId(zilla:id("sse")) - .scheme("http") - .authority("localhost:8080") - .path("/test") - .lastId(null) - .build()} + .typeId(zilla:id("sse")) + .scheme("http") + .authority("localhost:8080") + .path("/test") + .lastId(null) + .build()} connected -write flush - write zilla:data.ext ${sse:dataEx() - .typeId(zilla:id("sse")) - .id("AQQABAIC/revision=42") - .build()} -write "Hello, world" + .typeId(zilla:id("sse")) + .id("AQQABAIC") + .type(null) + .build()} +write ${data} +write flush diff --git a/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/KafkaIT.java b/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/KafkaIT.java index 922c27061d..aec78b65c9 100644 --- a/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/KafkaIT.java +++ b/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/KafkaIT.java @@ -151,4 +151,13 @@ public void shouldReceiveClientSentAbort() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${kafka}/server.sent.100k.message/client", + "${kafka}/server.sent.100k.message/server"}) + public void shouldReceiveServerSent100kMessage() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/SseIT.java b/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/SseIT.java index 24abdd4b10..b3d3d92aa7 100644 --- a/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/SseIT.java +++ b/specs/binding-sse-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/sse/kafka/streams/SseIT.java @@ -178,4 +178,13 @@ public void shouldRejectClientSentMessage() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${sse}/server.sent.100k.message/client", + "${sse}/server.sent.100k.message/server"}) + public void shouldReceiveServerSent100kMessage() throws Exception + { + k3po.finish(); + } } From 7fe27713a1fb988c3b2336879749cca07f5f8b13 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 17:49:13 +0500 Subject: [PATCH 4/9] Remove debugging statement --- .../binding/kafka/internal/stream/KafkaMergedFactory.java | 2 +- .../application/merged/merged.fetch.message.values/client.rpt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index 1f15213e5c..571c602c6b 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -1335,7 +1335,7 @@ private void onMergedReplyWindow( assert maximum >= replyMax; final int credit = (int)(acknowledge - replyAck) + (maximum - replyMax); - System.out.println(credit); + assert credit >= 0; this.replyAck = acknowledge; this.replyMax = maximum; diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt index 05d022ec07..461b3ad7fd 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt @@ -15,7 +15,7 @@ # connect "zilla://streams/app0" - option zilla:window 16 + option zilla:window 12 option zilla:transmission "half-duplex" write zilla:begin.ext ${kafka:beginEx() From a763b7f7d93586e13eaa861658ac8b97bad28ad6 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 17:50:01 +0500 Subject: [PATCH 5/9] Remove debugging statement --- .../binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java index 8c1fd03597..3c83b39924 100644 --- a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java +++ b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java @@ -635,8 +635,6 @@ private void onKafkaData( replySeq = sequence + reserved; - System.out.println(String.format("Sse budget ondata %d", replyMax - (int)(replySeq - replyAck))); - assert replyAck <= replySeq; if (replySeq > replyAck + replyMax) From ceaa5ed808f43a3bcd551a794d92e0bbad782806 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 18:16:49 +0500 Subject: [PATCH 6/9] Fix typo --- .../application/merged/merged.fetch.message.values/client.rpt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt index 461b3ad7fd..05d022ec07 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.fetch.message.values/client.rpt @@ -15,7 +15,7 @@ # connect "zilla://streams/app0" - option zilla:window 12 + option zilla:window 16 option zilla:transmission "half-duplex" write zilla:begin.ext ${kafka:beginEx() From 6c0ce8d85aa826aa33609c02233fd7598787ecab Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 21:04:31 +0500 Subject: [PATCH 7/9] Applied feedback from PR --- .../sse/kafka/internal/stream/SseKafkaProxyFactory.java | 2 -- .../aklivity/zilla/runtime/engine/EngineConfiguration.java | 2 +- .../zilla/specs/binding/kafka/internal/KafkaFunctions.java | 5 ----- 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java index 3c83b39924..1d67ba497b 100644 --- a/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java +++ b/runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/stream/SseKafkaProxyFactory.java @@ -812,8 +812,6 @@ private void doKafkaWindow( doWindow(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization, budgetId, padding, minimum, capabilities); - - System.out.println(String.format("Sse budget doKafkaWindow %d", replyMax - (int)(replySeq - replyAck))); } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java index 4353f19dbe..77ba4cffdc 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java @@ -85,7 +85,7 @@ public class EngineConfiguration extends Configuration EngineConfiguration::decodeHostResolver, EngineConfiguration::defaultHostResolver); ENGINE_BUDGETS_BUFFER_CAPACITY = config.property("budgets.buffer.capacity", 1024 * 1024); ENGINE_LOAD_BUFFER_CAPACITY = config.property("load.buffer.capacity", 1024 * 8); - ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 2097152); + ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 1024 * 1024); ENGINE_COMMAND_BUFFER_CAPACITY = config.property("command.buffer.capacity", 1024 * 1024); ENGINE_RESPONSE_BUFFER_CAPACITY = config.property("response.buffer.capacity", 1024 * 1024); ENGINE_COUNTERS_BUFFER_CAPACITY = config.property("counters.buffer.capacity", 1024 * 1024); diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index ae3dbf668e..45707cbf77 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -1339,11 +1339,6 @@ public KafkaMergedDataExBuilder header( return this; } - public KafkaMergedDataExBuilder headersNull() - { - return this; - } - public KafkaMergedDataExBuilder headerNull( String name) { From 7309022cba3c65c793a883497fda753ec1b345e0 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 21:06:12 +0500 Subject: [PATCH 8/9] Fix typo --- .../io/aklivity/zilla/runtime/engine/EngineConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java index 77ba4cffdc..6bdc7840ec 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java @@ -85,7 +85,7 @@ public class EngineConfiguration extends Configuration EngineConfiguration::decodeHostResolver, EngineConfiguration::defaultHostResolver); ENGINE_BUDGETS_BUFFER_CAPACITY = config.property("budgets.buffer.capacity", 1024 * 1024); ENGINE_LOAD_BUFFER_CAPACITY = config.property("load.buffer.capacity", 1024 * 8); - ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 1024 * 1024); + ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 1024 * 1024); ENGINE_COMMAND_BUFFER_CAPACITY = config.property("command.buffer.capacity", 1024 * 1024); ENGINE_RESPONSE_BUFFER_CAPACITY = config.property("response.buffer.capacity", 1024 * 1024); ENGINE_COUNTERS_BUFFER_CAPACITY = config.property("counters.buffer.capacity", 1024 * 1024); From 9a3a1ea4e7137188a3455f16f670b7490fef3fc1 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 Jun 2023 21:07:15 +0500 Subject: [PATCH 9/9] Revert back change --- .../zilla/specs/binding/kafka/internal/KafkaFunctions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index 45707cbf77..a38ebaa231 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -397,7 +397,6 @@ public KafkaFilterBuilder header( String name, String value) { - if (value == null) { nameRO.wrap(name.getBytes(UTF_8));