Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include kafka client id consistently in all kafka protocol encoders #621

Merged
merged 10 commits into from
Dec 6, 2023
Prev Previous commit
Include kafka client id consistently in all kafka protocol encoders
  • Loading branch information
jfallows committed Dec 6, 2023
commit ab92a2fc7ad5b821f3fd4ecb90ad229297c3ae3b
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ private void doEncodeDescribeRequest(
.apiKey(DESCRIBE_CONFIGS_API_KEY)
.apiVersion(DESCRIBE_CONFIGS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1307,7 +1307,7 @@ private void doEncodeDescribeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2606,7 +2606,7 @@ private void doEncodeOffsetsRequest(
.apiKey(OFFSETS_API_KEY)
.apiVersion(OFFSETS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2645,7 +2645,7 @@ private void doEncodeOffsetsRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -2673,7 +2673,7 @@ private void doEncodeFetchRequest(
.apiKey(FETCH_API_KEY)
.apiVersion(FETCH_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2712,7 +2712,7 @@ private void doEncodeFetchRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private final Long2ObjectHashMap<GroupMembership> instanceIds;
private final Object2ObjectHashMap<String, KafkaGroupStream> groupStreams;
private final Map<String, String> configs;
private final String clientId;
private final Duration rebalanceTimeout;
private final String groupMinSessionTimeoutDefault;
private final String groupMaxSessionTimeoutDefault;
Expand All @@ -291,7 +290,6 @@ public KafkaClientGroupFactory(
{
super(config, context);
this.rebalanceTimeout = config.clientGroupRebalanceTimeout();
this.clientId = config.clientId();
this.supplyInstanceId = config.clientInstanceIdSupplier();
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.proxyTypeId = context.supplyTypeId("proxy");
Expand Down Expand Up @@ -2123,7 +2121,7 @@ private void doEncodeFindCoordinatorRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -2823,7 +2821,7 @@ private void doEncodeDescribeRequest(
.apiKey(DESCRIBE_CONFIGS_API_KEY)
.apiVersion(DESCRIBE_CONFIGS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2860,7 +2858,7 @@ private void doEncodeDescribeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3627,7 +3625,7 @@ private void doEncodeJoinGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3833,7 +3831,7 @@ private void doEncodeSyncGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress.get());
Expand Down Expand Up @@ -3881,7 +3879,7 @@ private void doEncodeHeartbeatRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3934,7 +3932,7 @@ private void doEncodeLeaveGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ private void doEncodeMetaRequest(
.apiKey(METADATA_API_KEY)
.apiVersion(METADATA_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -1501,7 +1501,7 @@ private void doEncodeMetaRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ private void doEncodeOffsetCommitRequest(
.apiKey(OFFSET_COMMIT_API_KEY)
.apiVersion(OFFSET_COMMIT_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1363,7 +1363,7 @@ private void doEncodeOffsetCommitRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ private void doEncodeOffsetFetchRequest(
.apiKey(OFFSET_FETCH_API_KEY)
.apiVersion(OFFSET_FETCH_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1430,7 +1430,7 @@ private void doEncodeOffsetFetchRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i
private static final byte RECORD_ATTRIBUTES_NONE = 0;

private static final String TRANSACTION_ID_NONE = null;
private static final String CLIENT_ID_NONE = null;

private static final int TIMESTAMP_NONE = 0;

Expand Down Expand Up @@ -1834,7 +1833,7 @@ private void doEncodeProduceRequest(
.apiKey(PRODUCE_API_KEY)
.apiVersion(PRODUCE_API_VERSION)
.correlationId(0)
.clientId(CLIENT_ID_NONE)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1902,7 +1901,7 @@ private void doEncodeProduceRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaScramMechanism;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.sasl.SaslAuthenticateRequestFW;
Expand Down Expand Up @@ -94,6 +95,7 @@ public abstract class KafkaClientSaslHandshaker
private Matcher serverResponseMatcher;
private byte[] result, ui, prev;

protected final String16FW clientId;
protected final LongUnaryOperator supplyInitialId;
protected final LongUnaryOperator supplyReplyId;
protected final MutableDirectBuffer writeBuffer;
Expand All @@ -102,6 +104,7 @@ public KafkaClientSaslHandshaker(
KafkaConfiguration config,
EngineContext context)
{
this.clientId = new String16FW(config.clientId());
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
Expand Down Expand Up @@ -157,7 +160,7 @@ protected final void doEncodeSaslHandshakeRequest(
.apiKey(SASL_HANDSHAKE_API_KEY)
.apiVersion(SASL_HANDSHAKE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -177,7 +180,7 @@ protected final void doEncodeSaslHandshakeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -212,7 +215,7 @@ private void doEncodeSaslPlainAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -242,7 +245,7 @@ private void doEncodeSaslPlainAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -270,7 +273,7 @@ private void doEncodeSaslScramFirstAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -304,7 +307,7 @@ private void doEncodeSaslScramFirstAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -359,7 +362,7 @@ private void doEncodeSaslScramFinalAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -379,7 +382,7 @@ private void doEncodeSaslScramFinalAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ connect "zilla://streams/net0"

connected

write 17 # size
write 22 # size
17s # sasl.handshake
1s # v1
${newRequestId}
-1s # no client id
5s "zilla" # client id
5s "PLAIN" # mechanism

read 17 # size
Expand All @@ -41,11 +41,11 @@ read 17 # size
1 # mechanisms
5s "PLAIN" # PLAIN

write 32 # size
write 37 # size
36s # sasl.authenticate
1s # v1
${newRequestId}
-1s # no client id
5s "zilla" # client id
18
[0x00] "username" # authentication bytes
[0x00] "password"
Expand All @@ -57,11 +57,11 @@ read 20 # size
-1s # authentication bytes
0L # session lifetime

write 41 # size
write 46 # size
32s # describe configs
0s # v0
${newRequestId}
-1s # no client id
5s "zilla" # client id
1 # resources
[0x02] # topic resource
4s "test" # "test" topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ accepted

connected

read 17 # size
read 22 # size
17s # sasl.handshake
1s # v1
(int:requestId)
-1s # no client id
5s "zilla" # client id
5s "PLAIN" # mechanism

write 17 # size
Expand All @@ -38,11 +38,11 @@ write 17 # size
1 # mechanisms
5s "PLAIN" # PLAIN

read 32 # size
read 37 # size
36s # sasl.authenticate
1s # v1
(int:requestId)
-1s # no client id
5s "zilla" # client id
18
[0x00] "username" # authentication bytes
[0x00] "password"
Expand All @@ -54,11 +54,11 @@ write 20 # size
-1s # authentication bytes
0L # session lifetime

read 41 # size
read 46 # size
32s # describe configs
0s # v0
(int:requestId)
-1s # no client id
5s "zilla" # client id
1 # resources
[0x02] # topic resource
4s "test" # "test" topic
Expand Down
Loading