Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix message fragmentation in sse-kafka and flow control in kafka merge #283

Merged
merged 9 commits into from
Jun 23, 2023

Conversation

akrambek
Copy link
Contributor

@akrambek akrambek commented Jun 23, 2023

Description

Incorrect flow control credit when large message in flight was causing flow control budget to be exceeded when near the flow control limit.

Fixes #281

@akrambek akrambek changed the title Bug/header npe Fix message fragmentation in sse-kafka and flow control in kafka merge Jun 23, 2023
@@ -806,6 +812,8 @@ private void doKafkaWindow(

doWindow(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, budgetId, padding, minimum, capabilities);

System.out.println(String.format("Sse budget doKafkaWindow %d", replyMax - (int)(replySeq - replyAck)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this System.out before we can merge.

@@ -85,7 +85,7 @@ public class EngineConfiguration extends Configuration
EngineConfiguration::decodeHostResolver, EngineConfiguration::defaultHostResolver);
ENGINE_BUDGETS_BUFFER_CAPACITY = config.property("budgets.buffer.capacity", 1024 * 1024);
ENGINE_LOAD_BUFFER_CAPACITY = config.property("load.buffer.capacity", 1024 * 8);
ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 1024 * 1024);
ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", 2097152);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change the defaults if avoidable.

If a specific IT requires a different value for this configuration then please override it for just that IT.

@@ -397,6 +397,7 @@ public KafkaFilterBuilder<T> header(
String name,
String value)
{

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this whitespace change.

@jfallows jfallows merged commit 45914b8 into aklivity:develop Jun 23, 2023
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Null pointer when Headers are null
2 participants